269 lines
11 KiB
Python
269 lines
11 KiB
Python
#
|
|
# Copyright (c) 2019 Wind River Systems, Inc.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
import six
|
|
|
|
from cephclient.client import CephClient
|
|
from cephclient.exception import CephClientFunctionNotImplemented
|
|
from cephclient.exception import CephClientInvalidOsdIdValue
|
|
from cephclient.exception import CephClientTypeError
|
|
|
|
|
|
class CephWrapper(CephClient):
|
|
|
|
def __init__(self, endpoint=''):
|
|
super(CephWrapper, self).__init__()
|
|
|
|
def auth_import(self, body='json', timeout=None):
|
|
raise CephClientFunctionNotImplemented(name='auth_import')
|
|
|
|
def _sanitize_osdid_to_str(self, _id):
|
|
if isinstance(_id, six.string_types):
|
|
prefix = 'osd.'
|
|
if not _id.startswith(prefix):
|
|
try:
|
|
int(_id)
|
|
except ValueError:
|
|
raise CephClientInvalidOsdIdValue(
|
|
osdid=_id)
|
|
_id = prefix + _id
|
|
elif isinstance(_id, six.integer_types):
|
|
_id = 'osd.{}'.format(_id)
|
|
else:
|
|
raise CephClientInvalidOsdIdValue(
|
|
osdid=_id)
|
|
return _id
|
|
|
|
def _sanitize_osdid_to_int(self, _id):
|
|
if isinstance(_id, six.string_types):
|
|
prefix = 'osd.'
|
|
if _id.startswith(prefix):
|
|
_id = _id[len(prefix):]
|
|
try:
|
|
_id = int(_id)
|
|
except ValueError:
|
|
raise CephClientInvalidOsdIdValue(
|
|
osdid=_id)
|
|
elif not isinstance(_id, six.integer_types):
|
|
raise CephClientInvalidOsdIdValue(
|
|
osdid=_id)
|
|
return _id
|
|
|
|
def osd_create(self, uuid, body='json', timeout=None, params=None):
|
|
"""create new osd (with optional UUID and ID)
|
|
|
|
Notes:
|
|
1. osd create declares it accepts osd id as string but only works when
|
|
given an integer value; it automatically generates an ID otherwise
|
|
instead of using the one provided by 'osd create id=...'
|
|
|
|
2. old cephclient passes osd id through params dictionary
|
|
"""
|
|
kwargs = dict(uuid=uuid, body=body, timeout=timeout)
|
|
try:
|
|
kwargs['id'] = self._sanitize_osdid_to_int(params['id'])
|
|
except (KeyError, TypeError):
|
|
pass
|
|
return self._request('osd create', **kwargs)
|
|
|
|
def osd_rm(self, ids, body='json', timeout=None):
|
|
"""remove osd(s) <id> [<id>...], or use <any|all> to remove all osds """
|
|
if isinstance(ids, list):
|
|
ids = [self._sanitize_osdid_to_str(_id)
|
|
for _id in ids]
|
|
else:
|
|
ids = self._sanitize_osdid_to_str(ids)
|
|
return super(CephWrapper, self).osd_rm(
|
|
ids=ids, body=body, timeout=timeout)
|
|
|
|
def osd_remove(self, ids, body='json', timeout=None):
|
|
return self.osd_rm(ids, body=body, timeout=timeout)
|
|
|
|
def osd_down(self, ids, body='json', timeout=None):
|
|
"""set osd(s) <id> [<id>...] down, or use <any|all> to set all osds down """
|
|
if isinstance(ids, list):
|
|
ids = [self._sanitize_osdid_to_str(_id)
|
|
for _id in ids]
|
|
else:
|
|
ids = self._sanitize_osdid_to_str(ids)
|
|
return super(CephWrapper, self).osd_down(
|
|
ids=ids, body=body, timeout=timeout)
|
|
|
|
OSD_CRUSH_TREE_CONVERTED_FIELDS = [
|
|
'crush_weight', 'depth', 'id', 'name', 'type', 'type_id']
|
|
|
|
def _osd_crush_tree_convert_node(self, node):
|
|
return {k: node[k] for k in self.OSD_CRUSH_TREE_CONVERTED_FIELDS
|
|
if k in node}
|
|
|
|
def _osd_crush_tree_populate_tree(self, node, node_map):
|
|
children = node.get('children')
|
|
node = self._osd_crush_tree_convert_node(node)
|
|
if node['type'] != 'osd':
|
|
node['items'] = []
|
|
for _id in children:
|
|
node['items'].append(
|
|
self._osd_crush_tree_populate_tree(
|
|
node_map[_id], node_map))
|
|
return node
|
|
|
|
def osd_crush_tree(self, shadow=None, body='json', timeout=None):
|
|
"""dump crush buckets and items in a tree view """
|
|
response, _body = super(CephWrapper, self).osd_crush_tree(
|
|
shadow=shadow, body=body, timeout=timeout)
|
|
trees = []
|
|
if response.ok and body == 'json' \
|
|
and 'output' in _body:
|
|
node_map = {}
|
|
root_nodes = []
|
|
for node in _body['output']:
|
|
node_map[node['id']] = node
|
|
if node['type'] == 'root':
|
|
root_nodes.append(node)
|
|
for root in root_nodes:
|
|
trees.append(
|
|
self._osd_crush_tree_populate_tree(
|
|
root, node_map))
|
|
_body['output'] = trees
|
|
return response, _body
|
|
|
|
def _osd_crush_rule_by_ruleset(self, ruleset, timeout=None):
|
|
response, _body = self.osd_crush_rule_dump(
|
|
body='json', timeout=timeout)
|
|
if not response.ok:
|
|
return response, _body
|
|
name = None
|
|
for rule in _body['output']:
|
|
if rule.get('ruleset') == ruleset:
|
|
name = rule.get('rule_name')
|
|
_body['output'] = dict(rule=name)
|
|
return response, _body
|
|
|
|
def _osd_crush_ruleset_by_rule(self, rule, timeout=None):
|
|
response, _body = self.osd_crush_rule_dump(
|
|
name=rule, body='json', timeout=timeout)
|
|
return response, _body
|
|
|
|
def osd_pool_create(self, pool, pg_num, pgp_num=None, pool_type=None,
|
|
erasure_code_profile=None, ruleset=None,
|
|
expected_num_objects=None, body='json', timeout=None):
|
|
"""create pool
|
|
|
|
Notes:
|
|
1. map 'ruleset' to 'rule' (assuming 1:1 correspondence)
|
|
"""
|
|
response, _body = self._osd_crush_rule_by_ruleset(ruleset)
|
|
if not response.ok:
|
|
return response, _body
|
|
rule = _body['output']['rule']
|
|
return super(CephWrapper, self).osd_pool_create(
|
|
pool, pg_num, pgp_num=pgp_num, pool_type=pool_type,
|
|
erasure_code_profile=erasure_code_profile, rule=rule,
|
|
expected_num_objects=expected_num_objects, body=body,
|
|
timeout=timeout)
|
|
|
|
def osd_get_pool_param(self, pool, var, body='json', timeout=None):
|
|
"""get pool parameter <var> """
|
|
if var == 'crush_ruleset':
|
|
response, _body = super(CephWrapper, self).osd_pool_get(
|
|
pool, 'crush_rule', body='json', timeout=timeout)
|
|
if response.ok:
|
|
rule = _body['output']['crush_rule']
|
|
del _body['output']['crush_rule']
|
|
response, _body = self._osd_crush_ruleset_by_rule(
|
|
rule, timeout=timeout)
|
|
if response.ok:
|
|
_body['output'] = dict(
|
|
crush_ruleset=_body['output']['ruleset'])
|
|
return response, _body
|
|
else:
|
|
return super(CephWrapper, self).osd_pool_get(
|
|
pool, var, body=body, timeout=timeout)
|
|
|
|
def osd_pool_set(self, pool, var, val, force=None,
|
|
body='json', timeout=None):
|
|
"""set pool parameter <var> to <val> """
|
|
return super(CephWrapper, self).osd_pool_set(
|
|
pool=pool, var=var, val=str(val),
|
|
force=force, body=body, timeout=timeout)
|
|
|
|
def osd_set_pool_param(self, pool, var, val, force=None,
|
|
body='json', timeout=None):
|
|
"""set pool parameter <var> to <val> """
|
|
if var == 'crush_ruleset':
|
|
var = 'crush_rule'
|
|
response, _body = self._osd_crush_rule_by_ruleset(
|
|
val, timeout=timeout)
|
|
if not response.ok:
|
|
return response, _body
|
|
val = _body['output']['rule']
|
|
return super(CephWrapper, self).osd_pool_set(
|
|
pool, var, str(val), force=None,
|
|
body=body, timeout=timeout)
|
|
|
|
def osd_get_pool_quota(self, pool, body='json', timeout=None):
|
|
"""obtain object or byte limits for pool """
|
|
return super(CephWrapper, self).osd_pool_get_quota(
|
|
pool, body=body, timeout=timeout)
|
|
|
|
def osd_set_pool_quota(self, pool, field, val, body='json', timeout=None):
|
|
"""set object or byte limit on pool """
|
|
return super(CephWrapper, self).osd_pool_set_quota(
|
|
pool, field, str(val), body=body, timeout=timeout)
|
|
|
|
def osd_pool_set_quota(self, pool, field, val,
|
|
body='json', timeout=None):
|
|
"""set object or byte limit on pool """
|
|
return super(CephWrapper, self).osd_pool_set_quota(
|
|
pool=pool, field=field, val=str(val),
|
|
body=body, timeout=timeout)
|
|
|
|
def _auth_convert_caps(self, caps):
|
|
if caps:
|
|
if not isinstance(caps, dict):
|
|
raise CephClientTypeError(
|
|
name='caps',
|
|
actual=type(caps),
|
|
expected=dict)
|
|
_caps = []
|
|
for key, value in list(caps.items()):
|
|
_caps.append(key)
|
|
_caps.append(value)
|
|
caps = _caps
|
|
return caps
|
|
|
|
def auth_add(self, entity, caps=None, body='json', timeout=None):
|
|
"""add auth info for <entity> from input file, or random key if no input is given, and/or any caps specified in the command """
|
|
caps = self._auth_convert_caps(caps)
|
|
return super(CephWrapper, self).auth_add(
|
|
entity, caps=caps, body=body, timeout=timeout)
|
|
|
|
def auth_caps(self, entity, caps, body='json', timeout=None):
|
|
"""update caps for <name> from caps specified in the command """
|
|
caps = self._auth_convert_caps(caps)
|
|
return super(CephWrapper, self).auth_caps(
|
|
entity, caps=caps, body=body, timeout=timeout)
|
|
|
|
def auth_get_or_create(self, entity, caps=None, body='json', timeout=None):
|
|
"""add auth info for <entity> from input file, or random key if no input given, and/or any caps specified in the command """
|
|
caps = self._auth_convert_caps(caps)
|
|
return super(CephWrapper, self).auth_get_or_create(
|
|
entity, caps, body=body, timeout=timeout)
|
|
|
|
def auth_get_or_create_key(self, entity, caps=None,
|
|
body='json', timeout=None):
|
|
|
|
"""get, or add, key for <name> from system/caps pairs specified in the command. If key already exists, any given caps must match the existing caps for that key. """
|
|
caps = self._auth_convert_caps(caps)
|
|
response, _body = super(CephWrapper, self).auth_get_or_create_key(
|
|
entity, caps, body=body, timeout=timeout)
|
|
if response.ok:
|
|
_body['output'] = _body['output']
|
|
return response, _body
|
|
|
|
def osd_set_key(self, key, sure=None, body='json', timeout=None):
|
|
"""set <key> """
|
|
return self.osd_set(key, sure=sure, body=body, timeout=timeout)
|