Juju Charm - Ceph OSD
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

broker.py 30KB


  1. # Copyright 2016 Canonical Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import collections
  15. import json
  16. import os
  17. from tempfile import NamedTemporaryFile
  18. from ceph.utils import (
  19. get_cephfs,
  20. get_osd_weight
  21. )
  22. from ceph.crush_utils import Crushmap
  23. from charmhelpers.core.hookenv import (
  24. log,
  25. DEBUG,
  26. INFO,
  27. ERROR,
  28. )
  29. from charmhelpers.contrib.storage.linux.ceph import (
  30. create_erasure_profile,
  31. delete_pool,
  32. erasure_profile_exists,
  33. get_osds,
  34. monitor_key_get,
  35. monitor_key_set,
  36. pool_exists,
  37. pool_set,
  38. remove_pool_snapshot,
  39. rename_pool,
  40. set_pool_quota,
  41. snapshot_pool,
  42. validator,
  43. ErasurePool,
  44. Pool,
  45. ReplicatedPool,
  46. )
  47. # This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
  48. # This should do a decent job of preventing people from passing in bad values.
  49. # It will give a useful error message
  50. from subprocess import check_call, check_output, CalledProcessError
  51. POOL_KEYS = {
  52. # "Ceph Key Name": [Python type, [Valid Range]]
  53. "size": [int],
  54. "min_size": [int],
  55. "crash_replay_interval": [int],
  56. "pgp_num": [int], # = or < pg_num
  57. "crush_ruleset": [int],
  58. "hashpspool": [bool],
  59. "nodelete": [bool],
  60. "nopgchange": [bool],
  61. "nosizechange": [bool],
  62. "write_fadvise_dontneed": [bool],
  63. "noscrub": [bool],
  64. "nodeep-scrub": [bool],
  65. "hit_set_type": [str, ["bloom", "explicit_hash",
  66. "explicit_object"]],
  67. "hit_set_count": [int, [1, 1]],
  68. "hit_set_period": [int],
  69. "hit_set_fpp": [float, [0.0, 1.0]],
  70. "cache_target_dirty_ratio": [float],
  71. "cache_target_dirty_high_ratio": [float],
  72. "cache_target_full_ratio": [float],
  73. "target_max_bytes": [int],
  74. "target_max_objects": [int],
  75. "cache_min_flush_age": [int],
  76. "cache_min_evict_age": [int],
  77. "fast_read": [bool],
  78. "allow_ec_overwrites": [bool],
  79. "compression_mode": [str, ["none", "passive", "aggressive", "force"]],
  80. "compression_algorithm": [str, ["lz4", "snappy", "zlib", "zstd"]],
  81. "compression_required_ratio": [float, [0.0, 1.0]],
  82. }
  83. CEPH_BUCKET_TYPES = [
  84. 'osd',
  85. 'host',
  86. 'chassis',
  87. 'rack',
  88. 'row',
  89. 'pdu',
  90. 'pod',
  91. 'room',
  92. 'datacenter',
  93. 'region',
  94. 'root'
  95. ]
  96. def decode_req_encode_rsp(f):
  97. """Decorator to decode incoming requests and encode responses."""
  98. def decode_inner(req):
  99. return json.dumps(f(json.loads(req)))
  100. return decode_inner
  101. @decode_req_encode_rsp
  102. def process_requests(reqs):
  103. """Process Ceph broker request(s).
  104. This is a versioned api. API version must be supplied by the client making
  105. the request.
  106. :param reqs: dict of request parameters.
  107. :returns: dict. exit-code and reason if not 0
  108. """
  109. request_id = reqs.get('request-id')
  110. try:
  111. version = reqs.get('api-version')
  112. if version == 1:
  113. log('Processing request {}'.format(request_id), level=DEBUG)
  114. resp = process_requests_v1(reqs['ops'])
  115. if request_id:
  116. resp['request-id'] = request_id
  117. return resp
  118. except Exception as exc:
  119. log(str(exc), level=ERROR)
  120. msg = ("Unexpected error occurred while processing requests: %s" %
  121. reqs)
  122. log(msg, level=ERROR)
  123. return {'exit-code': 1, 'stderr': msg}
  124. msg = ("Missing or invalid api version ({})".format(version))
  125. resp = {'exit-code': 1, 'stderr': msg}
  126. if request_id:
  127. resp['request-id'] = request_id
  128. return resp
  129. def handle_create_erasure_profile(request, service):
  130. """Create an erasure profile.
  131. :param request: dict of request operations and params
  132. :param service: The ceph client to run the command under.
  133. :returns: dict. exit-code and reason if not 0
  134. """
  135. # "local" | "shec" or it defaults to "jerasure"
  136. erasure_type = request.get('erasure-type')
  137. # "host" | "rack" or it defaults to "host" # Any valid Ceph bucket
  138. failure_domain = request.get('failure-domain')
  139. name = request.get('name')
  140. k = request.get('k')
  141. m = request.get('m')
  142. l = request.get('l')
  143. if failure_domain not in CEPH_BUCKET_TYPES:
  144. msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES)
  145. log(msg, level=ERROR)
  146. return {'exit-code': 1, 'stderr': msg}
  147. create_erasure_profile(service=service, erasure_plugin_name=erasure_type,
  148. profile_name=name, failure_domain=failure_domain,
  149. data_chunks=k, coding_chunks=m, locality=l)
  150. def handle_add_permissions_to_key(request, service):
  151. """Groups are defined by the key cephx.groups.(namespace-)?-(name). This
  152. key will contain a dict serialized to JSON with data about the group,
  153. including pools and members.
  154. A group can optionally have a namespace defined that will be used to
  155. further restrict pool access.
  156. """
  157. resp = {'exit-code': 0}
  158. service_name = request.get('name')
  159. group_name = request.get('group')
  160. group_namespace = request.get('group-namespace')
  161. if group_namespace:
  162. group_name = "{}-{}".format(group_namespace, group_name)
  163. group = get_group(group_name=group_name)
  164. service_obj = get_service_groups(service=service_name,
  165. namespace=group_namespace)
  166. if request.get('object-prefix-permissions'):
  167. service_obj['object_prefix_perms'] = request.get(
  168. 'object-prefix-permissions')
  169. format("Service object: {}".format(service_obj))
  170. permission = request.get('group-permission') or "rwx"
  171. if service_name not in group['services']:
  172. group['services'].append(service_name)
  173. save_group(group=group, group_name=group_name)
  174. if permission not in service_obj['group_names']:
  175. service_obj['group_names'][permission] = []
  176. if group_name not in service_obj['group_names'][permission]:
  177. service_obj['group_names'][permission].append(group_name)
  178. save_service(service=service_obj, service_name=service_name)
  179. service_obj['groups'] = _build_service_groups(service_obj,
  180. group_namespace)
  181. update_service_permissions(service_name, service_obj, group_namespace)
  182. return resp
  183. def update_service_permissions(service, service_obj=None, namespace=None):
  184. """Update the key permissions for the named client in Ceph"""
  185. if not service_obj:
  186. service_obj = get_service_groups(service=service, namespace=namespace)
  187. permissions = pool_permission_list_for_service(service_obj)
  188. call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions
  189. try:
  190. check_call(call)
  191. except CalledProcessError as e:
  192. log("Error updating key capabilities: {}".format(e))
  193. def add_pool_to_group(pool, group, namespace=None):
  194. """Add a named pool to a named group"""
  195. group_name = group
  196. if namespace:
  197. group_name = "{}-{}".format(namespace, group_name)
  198. group = get_group(group_name=group_name)
  199. if pool not in group['pools']:
  200. group["pools"].append(pool)
  201. save_group(group, group_name=group_name)
  202. for service in group['services']:
  203. update_service_permissions(service, namespace=namespace)
  204. def pool_permission_list_for_service(service):
  205. """Build the permission string for Ceph for a given service"""
  206. permissions = []
  207. permission_types = collections.OrderedDict()
  208. for permission, group in sorted(service["group_names"].items()):
  209. if permission not in permission_types:
  210. permission_types[permission] = []
  211. for item in group:
  212. permission_types[permission].append(item)
  213. for permission, groups in permission_types.items():
  214. permission = "allow {}".format(permission)
  215. for group in groups:
  216. for pool in service['groups'][group].get('pools', []):
  217. permissions.append("{} pool={}".format(permission, pool))
  218. for permission, prefixes in sorted(
  219. service.get("object_prefix_perms", {}).items()):
  220. for prefix in prefixes:
  221. permissions.append("allow {} object_prefix {}".format(permission,
  222. prefix))
  223. return ['mon', 'allow r, allow command "osd blacklist"',
  224. 'osd', ', '.join(permissions)]
  225. def get_service_groups(service, namespace=None):
  226. """Services are objects stored with some metadata, they look like (for a
  227. service named "nova"):
  228. {
  229. group_names: {'rwx': ['images']},
  230. groups: {}
  231. }
  232. After populating the group, it looks like:
  233. {
  234. group_names: {'rwx': ['images']},
  235. groups: {
  236. 'images': {
  237. pools: ['glance'],
  238. services: ['nova']
  239. }
  240. }
  241. }
  242. """
  243. service_json = monitor_key_get(service='admin',
  244. key="cephx.services.{}".format(service))
  245. try:
  246. service = json.loads(service_json)
  247. except (TypeError, ValueError):
  248. service = None
  249. if service:
  250. service['groups'] = _build_service_groups(service, namespace)
  251. else:
  252. service = {'group_names': {}, 'groups': {}}
  253. return service
  254. def _build_service_groups(service, namespace=None):
  255. """Rebuild the 'groups' dict for a service group
  256. :returns: dict: dictionary keyed by group name of the following
  257. format:
  258. {
  259. 'images': {
  260. pools: ['glance'],
  261. services: ['nova', 'glance]
  262. },
  263. 'vms':{
  264. pools: ['nova'],
  265. services: ['nova']
  266. }
  267. }
  268. """
  269. all_groups = {}
  270. for groups in service['group_names'].values():
  271. for group in groups:
  272. name = group
  273. if namespace:
  274. name = "{}-{}".format(namespace, name)
  275. all_groups[group] = get_group(group_name=name)
  276. return all_groups
  277. def get_group(group_name):
  278. """A group is a structure to hold data about a named group, structured as:
  279. {
  280. pools: ['glance'],
  281. services: ['nova']
  282. }
  283. """
  284. group_key = get_group_key(group_name=group_name)
  285. group_json = monitor_key_get(service='admin', key=group_key)
  286. try:
  287. group = json.loads(group_json)
  288. except (TypeError, ValueError):
  289. group = None
  290. if not group:
  291. group = {
  292. 'pools': [],
  293. 'services': []
  294. }
  295. return group
  296. def save_service(service_name, service):
  297. """Persist a service in the monitor cluster"""
  298. service['groups'] = {}
  299. return monitor_key_set(service='admin',
  300. key="cephx.services.{}".format(service_name),
  301. value=json.dumps(service, sort_keys=True))
  302. def save_group(group, group_name):
  303. """Persist a group in the monitor cluster"""
  304. group_key = get_group_key(group_name=group_name)
  305. return monitor_key_set(service='admin',
  306. key=group_key,
  307. value=json.dumps(group, sort_keys=True))
  308. def get_group_key(group_name):
  309. """Build group key"""
  310. return 'cephx.groups.{}'.format(group_name)
  311. def handle_erasure_pool(request, service):
  312. """Create a new erasure coded pool.
  313. :param request: dict of request operations and params.
  314. :param service: The ceph client to run the command under.
  315. :returns: dict. exit-code and reason if not 0.
  316. """
  317. pool_name = request.get('name')
  318. erasure_profile = request.get('erasure-profile')
  319. quota = request.get('max-bytes')
  320. weight = request.get('weight')
  321. group_name = request.get('group')
  322. if erasure_profile is None:
  323. erasure_profile = "default-canonical"
  324. app_name = request.get('app-name')
  325. # Check for missing params
  326. if pool_name is None:
  327. msg = "Missing parameter. name is required for the pool"
  328. log(msg, level=ERROR)
  329. return {'exit-code': 1, 'stderr': msg}
  330. if group_name:
  331. group_namespace = request.get('group-namespace')
  332. # Add the pool to the group named "group_name"
  333. add_pool_to_group(pool=pool_name,
  334. group=group_name,
  335. namespace=group_namespace)
  336. # TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
  337. if not erasure_profile_exists(service=service, name=erasure_profile):
  338. # TODO: Fail and tell them to create the profile or default
  339. msg = ("erasure-profile {} does not exist. Please create it with: "
  340. "create-erasure-profile".format(erasure_profile))
  341. log(msg, level=ERROR)
  342. return {'exit-code': 1, 'stderr': msg}
  343. pool = ErasurePool(service=service, name=pool_name,
  344. erasure_code_profile=erasure_profile,
  345. percent_data=weight, app_name=app_name)
  346. # Ok make the erasure pool
  347. if not pool_exists(service=service, name=pool_name):
  348. log("Creating pool '{}' (erasure_profile={})"
  349. .format(pool.name, erasure_profile), level=INFO)
  350. pool.create()
  351. # Set a quota if requested
  352. if quota is not None:
  353. set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
  354. def handle_replicated_pool(request, service):
  355. """Create a new replicated pool.
  356. :param request: dict of request operations and params.
  357. :param service: The ceph client to run the command under.
  358. :returns: dict. exit-code and reason if not 0.
  359. """
  360. pool_name = request.get('name')
  361. replicas = request.get('replicas')
  362. quota = request.get('max-bytes')
  363. weight = request.get('weight')
  364. group_name = request.get('group')
  365. # Optional params
  366. pg_num = request.get('pg_num')
  367. if pg_num:
  368. # Cap pg_num to max allowed just in case.
  369. osds = get_osds(service)
  370. if osds:
  371. pg_num = min(pg_num, (len(osds) * 100 // replicas))
  372. app_name = request.get('app-name')
  373. # Check for missing params
  374. if pool_name is None or replicas is None:
  375. msg = "Missing parameter. name and replicas are required"
  376. log(msg, level=ERROR)
  377. return {'exit-code': 1, 'stderr': msg}
  378. if group_name:
  379. group_namespace = request.get('group-namespace')
  380. # Add the pool to the group named "group_name"
  381. add_pool_to_group(pool=pool_name,
  382. group=group_name,
  383. namespace=group_namespace)
  384. kwargs = {}
  385. if pg_num:
  386. kwargs['pg_num'] = pg_num
  387. if weight:
  388. kwargs['percent_data'] = weight
  389. if replicas:
  390. kwargs['replicas'] = replicas
  391. if app_name:
  392. kwargs['app_name'] = app_name
  393. pool = ReplicatedPool(service=service,
  394. name=pool_name, **kwargs)
  395. if not pool_exists(service=service, name=pool_name):
  396. log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
  397. level=INFO)
  398. pool.create()
  399. else:
  400. log("Pool '{}' already exists - skipping create".format(pool.name),
  401. level=DEBUG)
  402. # Set a quota if requested
  403. if quota is not None:
  404. set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
  405. def handle_create_cache_tier(request, service):
  406. """Create a cache tier on a cold pool. Modes supported are
  407. "writeback" and "readonly".
  408. :param request: dict of request operations and params
  409. :param service: The ceph client to run the command under.
  410. :returns: dict. exit-code and reason if not 0
  411. """
  412. # mode = "writeback" | "readonly"
  413. storage_pool = request.get('cold-pool')
  414. cache_pool = request.get('hot-pool')
  415. cache_mode = request.get('mode')
  416. if cache_mode is None:
  417. cache_mode = "writeback"
  418. # cache and storage pool must exist first
  419. if not pool_exists(service=service, name=storage_pool) or not pool_exists(
  420. service=service, name=cache_pool):
  421. msg = ("cold-pool: {} and hot-pool: {} must exist. Please create "
  422. "them first".format(storage_pool, cache_pool))
  423. log(msg, level=ERROR)
  424. return {'exit-code': 1, 'stderr': msg}
  425. p = Pool(service=service, name=storage_pool)
  426. p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
  427. def handle_remove_cache_tier(request, service):
  428. """Remove a cache tier from the cold pool.
  429. :param request: dict of request operations and params
  430. :param service: The ceph client to run the command under.
  431. :returns: dict. exit-code and reason if not 0
  432. """
  433. storage_pool = request.get('cold-pool')
  434. cache_pool = request.get('hot-pool')
  435. # cache and storage pool must exist first
  436. if not pool_exists(service=service, name=storage_pool) or not pool_exists(
  437. service=service, name=cache_pool):
  438. msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not "
  439. "deleting cache tier".format(storage_pool, cache_pool))
  440. log(msg, level=ERROR)
  441. return {'exit-code': 1, 'stderr': msg}
  442. pool = Pool(name=storage_pool, service=service)
  443. pool.remove_cache_tier(cache_pool=cache_pool)
  444. def handle_set_pool_value(request, service):
  445. """Sets an arbitrary pool value.
  446. :param request: dict of request operations and params
  447. :param service: The ceph client to run the command under.
  448. :returns: dict. exit-code and reason if not 0
  449. """
  450. # Set arbitrary pool values
  451. params = {'pool': request.get('name'),
  452. 'key': request.get('key'),
  453. 'value': request.get('value')}
  454. if params['key'] not in POOL_KEYS:
  455. msg = "Invalid key '{}'".format(params['key'])
  456. log(msg, level=ERROR)
  457. return {'exit-code': 1, 'stderr': msg}
  458. # Get the validation method
  459. validator_params = POOL_KEYS[params['key']]
  460. if len(validator_params) is 1:
  461. # Validate that what the user passed is actually legal per Ceph's rules
  462. validator(params['value'], validator_params[0])
  463. else:
  464. # Validate that what the user passed is actually legal per Ceph's rules
  465. validator(params['value'], validator_params[0], validator_params[1])
  466. # Set the value
  467. pool_set(service=service, pool_name=params['pool'], key=params['key'],
  468. value=params['value'])
  469. def handle_rgw_regionmap_update(request, service):
  470. """Change the radosgw region map.
  471. :param request: dict of request operations and params
  472. :param service: The ceph client to run the command under.
  473. :returns: dict. exit-code and reason if not 0
  474. """
  475. name = request.get('client-name')
  476. if not name:
  477. msg = "Missing rgw-region or client-name params"
  478. log(msg, level=ERROR)
  479. return {'exit-code': 1, 'stderr': msg}
  480. try:
  481. check_output(['radosgw-admin',
  482. '--id', service,
  483. 'regionmap', 'update', '--name', name])
  484. except CalledProcessError as err:
  485. log(err.output, level=ERROR)
  486. return {'exit-code': 1, 'stderr': err.output}
  487. def handle_rgw_regionmap_default(request, service):
  488. """Create a radosgw region map.
  489. :param request: dict of request operations and params
  490. :param service: The ceph client to run the command under.
  491. :returns: dict. exit-code and reason if not 0
  492. """
  493. region = request.get('rgw-region')
  494. name = request.get('client-name')
  495. if not region or not name:
  496. msg = "Missing rgw-region or client-name params"
  497. log(msg, level=ERROR)
  498. return {'exit-code': 1, 'stderr': msg}
  499. try:
  500. check_output(
  501. [
  502. 'radosgw-admin',
  503. '--id', service,
  504. 'regionmap',
  505. 'default',
  506. '--rgw-region', region,
  507. '--name', name])
  508. except CalledProcessError as err:
  509. log(err.output, level=ERROR)
  510. return {'exit-code': 1, 'stderr': err.output}
  511. def handle_rgw_zone_set(request, service):
  512. """Create a radosgw zone.
  513. :param request: dict of request operations and params
  514. :param service: The ceph client to run the command under.
  515. :returns: dict. exit-code and reason if not 0
  516. """
  517. json_file = request.get('zone-json')
  518. name = request.get('client-name')
  519. region_name = request.get('region-name')
  520. zone_name = request.get('zone-name')
  521. if not json_file or not name or not region_name or not zone_name:
  522. msg = "Missing json-file or client-name params"
  523. log(msg, level=ERROR)
  524. return {'exit-code': 1, 'stderr': msg}
  525. infile = NamedTemporaryFile(delete=False)
  526. with open(infile.name, 'w') as infile_handle:
  527. infile_handle.write(json_file)
  528. try:
  529. check_output(
  530. [
  531. 'radosgw-admin',
  532. '--id', service,
  533. 'zone',
  534. 'set',
  535. '--rgw-zone', zone_name,
  536. '--infile', infile.name,
  537. '--name', name,
  538. ]
  539. )
  540. except CalledProcessError as err:
  541. log(err.output, level=ERROR)
  542. return {'exit-code': 1, 'stderr': err.output}
  543. os.unlink(infile.name)
  544. def handle_put_osd_in_bucket(request, service):
  545. """Move an osd into a specified crush bucket.
  546. :param request: dict of request operations and params
  547. :param service: The ceph client to run the command under.
  548. :returns: dict. exit-code and reason if not 0
  549. """
  550. osd_id = request.get('osd')
  551. target_bucket = request.get('bucket')
  552. if not osd_id or not target_bucket:
  553. msg = "Missing OSD ID or Bucket"
  554. log(msg, level=ERROR)
  555. return {'exit-code': 1, 'stderr': msg}
  556. crushmap = Crushmap()
  557. try:
  558. crushmap.ensure_bucket_is_present(target_bucket)
  559. check_output(
  560. [
  561. 'ceph',
  562. '--id', service,
  563. 'osd',
  564. 'crush',
  565. 'set',
  566. str(osd_id),
  567. str(get_osd_weight(osd_id)),
  568. "root={}".format(target_bucket)
  569. ]
  570. )
  571. except Exception as exc:
  572. msg = "Failed to move OSD " \
  573. "{} into Bucket {} :: {}".format(osd_id, target_bucket, exc)
  574. log(msg, level=ERROR)
  575. return {'exit-code': 1, 'stderr': msg}
  576. def handle_rgw_create_user(request, service):
  577. """Create a new rados gateway user.
  578. :param request: dict of request operations and params
  579. :param service: The ceph client to run the command under.
  580. :returns: dict. exit-code and reason if not 0
  581. """
  582. user_id = request.get('rgw-uid')
  583. display_name = request.get('display-name')
  584. name = request.get('client-name')
  585. if not name or not display_name or not user_id:
  586. msg = "Missing client-name, display-name or rgw-uid"
  587. log(msg, level=ERROR)
  588. return {'exit-code': 1, 'stderr': msg}
  589. try:
  590. create_output = check_output(
  591. [
  592. 'radosgw-admin',
  593. '--id', service,
  594. 'user',
  595. 'create',
  596. '--uid', user_id,
  597. '--display-name', display_name,
  598. '--name', name,
  599. '--system'
  600. ]
  601. )
  602. try:
  603. user_json = json.loads(str(create_output.decode('UTF-8')))
  604. return {'exit-code': 0, 'user': user_json}
  605. except ValueError as err:
  606. log(err, level=ERROR)
  607. return {'exit-code': 1, 'stderr': err}
  608. except CalledProcessError as err:
  609. log(err.output, level=ERROR)
  610. return {'exit-code': 1, 'stderr': err.output}
  611. def handle_create_cephfs(request, service):
  612. """Create a new cephfs.
  613. :param request: The broker request
  614. :param service: The ceph client to run the command under.
  615. :returns: dict. exit-code and reason if not 0
  616. """
  617. cephfs_name = request.get('mds_name')
  618. data_pool = request.get('data_pool')
  619. metadata_pool = request.get('metadata_pool')
  620. # Check if the user params were provided
  621. if not cephfs_name or not data_pool or not metadata_pool:
  622. msg = "Missing mds_name, data_pool or metadata_pool params"
  623. log(msg, level=ERROR)
  624. return {'exit-code': 1, 'stderr': msg}
  625. # Sanity check that the required pools exist
  626. if not pool_exists(service=service, name=data_pool):
  627. msg = "CephFS data pool does not exist. Cannot create CephFS"
  628. log(msg, level=ERROR)
  629. return {'exit-code': 1, 'stderr': msg}
  630. if not pool_exists(service=service, name=metadata_pool):
  631. msg = "CephFS metadata pool does not exist. Cannot create CephFS"
  632. log(msg, level=ERROR)
  633. return {'exit-code': 1, 'stderr': msg}
  634. if get_cephfs(service=service):
  635. # CephFS new has already been called
  636. log("CephFS already created")
  637. return
  638. # Finally create CephFS
  639. try:
  640. check_output(["ceph",
  641. '--id', service,
  642. "fs", "new", cephfs_name,
  643. metadata_pool,
  644. data_pool])
  645. except CalledProcessError as err:
  646. if err.returncode == 22:
  647. log("CephFS already created")
  648. return
  649. else:
  650. log(err.output, level=ERROR)
  651. return {'exit-code': 1, 'stderr': err.output}
  652. def handle_rgw_region_set(request, service):
  653. # radosgw-admin region set --infile us.json --name client.radosgw.us-east-1
  654. """Set the rados gateway region.
  655. :param request: dict. The broker request.
  656. :param service: The ceph client to run the command under.
  657. :returns: dict. exit-code and reason if not 0
  658. """
  659. json_file = request.get('region-json')
  660. name = request.get('client-name')
  661. region_name = request.get('region-name')
  662. zone_name = request.get('zone-name')
  663. if not json_file or not name or not region_name or not zone_name:
  664. msg = "Missing json-file or client-name params"
  665. log(msg, level=ERROR)
  666. return {'exit-code': 1, 'stderr': msg}
  667. infile = NamedTemporaryFile(delete=False)
  668. with open(infile.name, 'w') as infile_handle:
  669. infile_handle.write(json_file)
  670. try:
  671. check_output(
  672. [
  673. 'radosgw-admin',
  674. '--id', service,
  675. 'region',
  676. 'set',
  677. '--rgw-zone', zone_name,
  678. '--infile', infile.name,
  679. '--name', name,
  680. ]
  681. )
  682. except CalledProcessError as err:
  683. log(err.output, level=ERROR)
  684. return {'exit-code': 1, 'stderr': err.output}
  685. os.unlink(infile.name)
  686. def process_requests_v1(reqs):
  687. """Process v1 requests.
  688. Takes a list of requests (dicts) and processes each one. If an error is
  689. found, processing stops and the client is notified in the response.
  690. Returns a response dict containing the exit code (non-zero if any
  691. operation failed along with an explanation).
  692. """
  693. ret = None
  694. log("Processing {} ceph broker requests".format(len(reqs)), level=INFO)
  695. for req in reqs:
  696. op = req.get('op')
  697. log("Processing op='{}'".format(op), level=DEBUG)
  698. # Use admin client since we do not have other client key locations
  699. # setup to use them for these operations.
  700. svc = 'admin'
  701. if op == "create-pool":
  702. pool_type = req.get('pool-type') # "replicated" | "erasure"
  703. # Default to replicated if pool_type isn't given
  704. if pool_type == 'erasure':
  705. ret = handle_erasure_pool(request=req, service=svc)
  706. else:
  707. ret = handle_replicated_pool(request=req, service=svc)
  708. elif op == "create-cephfs":
  709. ret = handle_create_cephfs(request=req, service=svc)
  710. elif op == "create-cache-tier":
  711. ret = handle_create_cache_tier(request=req, service=svc)
  712. elif op == "remove-cache-tier":
  713. ret = handle_remove_cache_tier(request=req, service=svc)
  714. elif op == "create-erasure-profile":
  715. ret = handle_create_erasure_profile(request=req, service=svc)
  716. elif op == "delete-pool":
  717. pool = req.get('name')
  718. ret = delete_pool(service=svc, name=pool)
  719. elif op == "rename-pool":
  720. old_name = req.get('name')
  721. new_name = req.get('new-name')
  722. ret = rename_pool(service=svc, old_name=old_name,
  723. new_name=new_name)
  724. elif op == "snapshot-pool":
  725. pool = req.get('name')
  726. snapshot_name = req.get('snapshot-name')
  727. ret = snapshot_pool(service=svc, pool_name=pool,
  728. snapshot_name=snapshot_name)
  729. elif op == "remove-pool-snapshot":
  730. pool = req.get('name')
  731. snapshot_name = req.get('snapshot-name')
  732. ret = remove_pool_snapshot(service=svc, pool_name=pool,
  733. snapshot_name=snapshot_name)
  734. elif op == "set-pool-value":
  735. ret = handle_set_pool_value(request=req, service=svc)
  736. elif op == "rgw-region-set":
  737. ret = handle_rgw_region_set(request=req, service=svc)
  738. elif op == "rgw-zone-set":
  739. ret = handle_rgw_zone_set(request=req, service=svc)
  740. elif op == "rgw-regionmap-update":
  741. ret = handle_rgw_regionmap_update(request=req, service=svc)
  742. elif op == "rgw-regionmap-default":
  743. ret = handle_rgw_regionmap_default(request=req, service=svc)
  744. elif op == "rgw-create-user":
  745. ret = handle_rgw_create_user(request=req, service=svc)
  746. elif op == "move-osd-to-bucket":
  747. ret = handle_put_osd_in_bucket(request=req, service=svc)
  748. elif op == "add-permissions-to-key":
  749. ret = handle_add_permissions_to_key(request=req, service=svc)
  750. else:
  751. msg = "Unknown operation '{}'".format(op)
  752. log(msg, level=ERROR)
  753. return {'exit-code': 1, 'stderr': msg}
  754. if type(ret) == dict and 'exit-code' in ret:
  755. return ret
  756. return {'exit-code': 0}