Juju Charm - Ceph OSD
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

broker.py 30KB


  1. # Copyright 2016 Canonical Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import collections
  15. import json
  16. import os
  17. from tempfile import NamedTemporaryFile
  18. from ceph.utils import (
  19. get_cephfs,
  20. get_osd_weight
  21. )
  22. from ceph.crush_utils import Crushmap
  23. from charmhelpers.core.hookenv import (
  24. log,
  25. DEBUG,
  26. INFO,
  27. ERROR,
  28. )
  29. from charmhelpers.contrib.storage.linux.ceph import (
  30. create_erasure_profile,
  31. delete_pool,
  32. erasure_profile_exists,
  33. get_osds,
  34. monitor_key_get,
  35. monitor_key_set,
  36. pool_exists,
  37. pool_set,
  38. remove_pool_snapshot,
  39. rename_pool,
  40. set_pool_quota,
  41. snapshot_pool,
  42. validator,
  43. ErasurePool,
  44. Pool,
  45. ReplicatedPool,
  46. )
  47. # This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
  48. # This should do a decent job of preventing people from passing in bad values.
  49. # It will give a useful error message
  50. from subprocess import check_call, check_output, CalledProcessError
  51. POOL_KEYS = {
  52. # "Ceph Key Name": [Python type, [Valid Range]]
  53. "size": [int],
  54. "min_size": [int],
  55. "crash_replay_interval": [int],
  56. "pgp_num": [int], # = or < pg_num
  57. "crush_ruleset": [int],
  58. "hashpspool": [bool],
  59. "nodelete": [bool],
  60. "nopgchange": [bool],
  61. "nosizechange": [bool],
  62. "write_fadvise_dontneed": [bool],
  63. "noscrub": [bool],
  64. "nodeep-scrub": [bool],
  65. "hit_set_type": [str, ["bloom", "explicit_hash",
  66. "explicit_object"]],
  67. "hit_set_count": [int, [1, 1]],
  68. "hit_set_period": [int],
  69. "hit_set_fpp": [float, [0.0, 1.0]],
  70. "cache_target_dirty_ratio": [float],
  71. "cache_target_dirty_high_ratio": [float],
  72. "cache_target_full_ratio": [float],
  73. "target_max_bytes": [int],
  74. "target_max_objects": [int],
  75. "cache_min_flush_age": [int],
  76. "cache_min_evict_age": [int],
  77. "fast_read": [bool],
  78. }
  79. CEPH_BUCKET_TYPES = [
  80. 'osd',
  81. 'host',
  82. 'chassis',
  83. 'rack',
  84. 'row',
  85. 'pdu',
  86. 'pod',
  87. 'room',
  88. 'datacenter',
  89. 'region',
  90. 'root'
  91. ]
  92. def decode_req_encode_rsp(f):
  93. """Decorator to decode incoming requests and encode responses."""
  94. def decode_inner(req):
  95. return json.dumps(f(json.loads(req)))
  96. return decode_inner
  97. @decode_req_encode_rsp
  98. def process_requests(reqs):
  99. """Process Ceph broker request(s).
  100. This is a versioned api. API version must be supplied by the client making
  101. the request.
  102. :param reqs: dict of request parameters.
  103. :returns: dict. exit-code and reason if not 0
  104. """
  105. request_id = reqs.get('request-id')
  106. try:
  107. version = reqs.get('api-version')
  108. if version == 1:
  109. log('Processing request {}'.format(request_id), level=DEBUG)
  110. resp = process_requests_v1(reqs['ops'])
  111. if request_id:
  112. resp['request-id'] = request_id
  113. return resp
  114. except Exception as exc:
  115. log(str(exc), level=ERROR)
  116. msg = ("Unexpected error occurred while processing requests: %s" %
  117. reqs)
  118. log(msg, level=ERROR)
  119. return {'exit-code': 1, 'stderr': msg}
  120. msg = ("Missing or invalid api version ({})".format(version))
  121. resp = {'exit-code': 1, 'stderr': msg}
  122. if request_id:
  123. resp['request-id'] = request_id
  124. return resp
  125. def handle_create_erasure_profile(request, service):
  126. """Create an erasure profile.
  127. :param request: dict of request operations and params
  128. :param service: The ceph client to run the command under.
  129. :returns: dict. exit-code and reason if not 0
  130. """
  131. # "local" | "shec" or it defaults to "jerasure"
  132. erasure_type = request.get('erasure-type')
  133. # "host" | "rack" or it defaults to "host" # Any valid Ceph bucket
  134. failure_domain = request.get('failure-domain')
  135. name = request.get('name')
  136. k = request.get('k')
  137. m = request.get('m')
  138. l = request.get('l')
  139. if failure_domain not in CEPH_BUCKET_TYPES:
  140. msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES)
  141. log(msg, level=ERROR)
  142. return {'exit-code': 1, 'stderr': msg}
  143. create_erasure_profile(service=service, erasure_plugin_name=erasure_type,
  144. profile_name=name, failure_domain=failure_domain,
  145. data_chunks=k, coding_chunks=m, locality=l)
  146. def handle_add_permissions_to_key(request, service):
  147. """Groups are defined by the key cephx.groups.(namespace-)?-(name). This
  148. key will contain a dict serialized to JSON with data about the group,
  149. including pools and members.
  150. A group can optionally have a namespace defined that will be used to
  151. further restrict pool access.
  152. """
  153. resp = {'exit-code': 0}
  154. service_name = request.get('name')
  155. group_name = request.get('group')
  156. group_namespace = request.get('group-namespace')
  157. if group_namespace:
  158. group_name = "{}-{}".format(group_namespace, group_name)
  159. group = get_group(group_name=group_name)
  160. service_obj = get_service_groups(service=service_name,
  161. namespace=group_namespace)
  162. if request.get('object-prefix-permissions'):
  163. service_obj['object_prefix_perms'] = request.get(
  164. 'object-prefix-permissions')
  165. format("Service object: {}".format(service_obj))
  166. permission = request.get('group-permission') or "rwx"
  167. if service_name not in group['services']:
  168. group['services'].append(service_name)
  169. save_group(group=group, group_name=group_name)
  170. if permission not in service_obj['group_names']:
  171. service_obj['group_names'][permission] = []
  172. if group_name not in service_obj['group_names'][permission]:
  173. service_obj['group_names'][permission].append(group_name)
  174. save_service(service=service_obj, service_name=service_name)
  175. service_obj['groups'] = _build_service_groups(service_obj,
  176. group_namespace)
  177. update_service_permissions(service_name, service_obj, group_namespace)
  178. return resp
  179. def update_service_permissions(service, service_obj=None, namespace=None):
  180. """Update the key permissions for the named client in Ceph"""
  181. if not service_obj:
  182. service_obj = get_service_groups(service=service, namespace=namespace)
  183. permissions = pool_permission_list_for_service(service_obj)
  184. call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions
  185. try:
  186. check_call(call)
  187. except CalledProcessError as e:
  188. log("Error updating key capabilities: {}".format(e))
  189. def add_pool_to_group(pool, group, namespace=None):
  190. """Add a named pool to a named group"""
  191. group_name = group
  192. if namespace:
  193. group_name = "{}-{}".format(namespace, group_name)
  194. group = get_group(group_name=group_name)
  195. if pool not in group['pools']:
  196. group["pools"].append(pool)
  197. save_group(group, group_name=group_name)
  198. for service in group['services']:
  199. update_service_permissions(service, namespace=namespace)
  200. def pool_permission_list_for_service(service):
  201. """Build the permission string for Ceph for a given service"""
  202. permissions = []
  203. permission_types = collections.OrderedDict()
  204. for permission, group in sorted(service["group_names"].items()):
  205. if permission not in permission_types:
  206. permission_types[permission] = []
  207. for item in group:
  208. permission_types[permission].append(item)
  209. for permission, groups in permission_types.items():
  210. permission = "allow {}".format(permission)
  211. for group in groups:
  212. for pool in service['groups'][group].get('pools', []):
  213. permissions.append("{} pool={}".format(permission, pool))
  214. for permission, prefixes in sorted(
  215. service.get("object_prefix_perms", {}).items()):
  216. for prefix in prefixes:
  217. permissions.append("allow {} object_prefix {}".format(permission,
  218. prefix))
  219. return ["mon", "allow r", "osd", ', '.join(permissions)]
  220. def get_service_groups(service, namespace=None):
  221. """Services are objects stored with some metadata, they look like (for a
  222. service named "nova"):
  223. {
  224. group_names: {'rwx': ['images']},
  225. groups: {}
  226. }
  227. After populating the group, it looks like:
  228. {
  229. group_names: {'rwx': ['images']},
  230. groups: {
  231. 'images': {
  232. pools: ['glance'],
  233. services: ['nova']
  234. }
  235. }
  236. }
  237. """
  238. service_json = monitor_key_get(service='admin',
  239. key="cephx.services.{}".format(service))
  240. try:
  241. service = json.loads(service_json)
  242. except (TypeError, ValueError):
  243. service = None
  244. if service:
  245. service['groups'] = _build_service_groups(service, namespace)
  246. else:
  247. service = {'group_names': {}, 'groups': {}}
  248. return service
  249. def _build_service_groups(service, namespace=None):
  250. """Rebuild the 'groups' dict for a service group
  251. :returns: dict: dictionary keyed by group name of the following
  252. format:
  253. {
  254. 'images': {
  255. pools: ['glance'],
  256. services: ['nova', 'glance]
  257. },
  258. 'vms':{
  259. pools: ['nova'],
  260. services: ['nova']
  261. }
  262. }
  263. """
  264. all_groups = {}
  265. for groups in service['group_names'].values():
  266. for group in groups:
  267. name = group
  268. if namespace:
  269. name = "{}-{}".format(namespace, name)
  270. all_groups[group] = get_group(group_name=name)
  271. return all_groups
  272. def get_group(group_name):
  273. """A group is a structure to hold data about a named group, structured as:
  274. {
  275. pools: ['glance'],
  276. services: ['nova']
  277. }
  278. """
  279. group_key = get_group_key(group_name=group_name)
  280. group_json = monitor_key_get(service='admin', key=group_key)
  281. try:
  282. group = json.loads(group_json)
  283. except (TypeError, ValueError):
  284. group = None
  285. if not group:
  286. group = {
  287. 'pools': [],
  288. 'services': []
  289. }
  290. return group
  291. def save_service(service_name, service):
  292. """Persist a service in the monitor cluster"""
  293. service['groups'] = {}
  294. return monitor_key_set(service='admin',
  295. key="cephx.services.{}".format(service_name),
  296. value=json.dumps(service, sort_keys=True))
  297. def save_group(group, group_name):
  298. """Persist a group in the monitor cluster"""
  299. group_key = get_group_key(group_name=group_name)
  300. return monitor_key_set(service='admin',
  301. key=group_key,
  302. value=json.dumps(group, sort_keys=True))
  303. def get_group_key(group_name):
  304. """Build group key"""
  305. return 'cephx.groups.{}'.format(group_name)
  306. def handle_erasure_pool(request, service):
  307. """Create a new erasure coded pool.
  308. :param request: dict of request operations and params.
  309. :param service: The ceph client to run the command under.
  310. :returns: dict. exit-code and reason if not 0.
  311. """
  312. pool_name = request.get('name')
  313. erasure_profile = request.get('erasure-profile')
  314. quota = request.get('max-bytes')
  315. weight = request.get('weight')
  316. group_name = request.get('group')
  317. if erasure_profile is None:
  318. erasure_profile = "default-canonical"
  319. app_name = request.get('app-name')
  320. # Check for missing params
  321. if pool_name is None:
  322. msg = "Missing parameter. name is required for the pool"
  323. log(msg, level=ERROR)
  324. return {'exit-code': 1, 'stderr': msg}
  325. if group_name:
  326. group_namespace = request.get('group-namespace')
  327. # Add the pool to the group named "group_name"
  328. add_pool_to_group(pool=pool_name,
  329. group=group_name,
  330. namespace=group_namespace)
  331. # TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
  332. if not erasure_profile_exists(service=service, name=erasure_profile):
  333. # TODO: Fail and tell them to create the profile or default
  334. msg = ("erasure-profile {} does not exist. Please create it with: "
  335. "create-erasure-profile".format(erasure_profile))
  336. log(msg, level=ERROR)
  337. return {'exit-code': 1, 'stderr': msg}
  338. pool = ErasurePool(service=service, name=pool_name,
  339. erasure_code_profile=erasure_profile,
  340. percent_data=weight, app_name=app_name)
  341. # Ok make the erasure pool
  342. if not pool_exists(service=service, name=pool_name):
  343. log("Creating pool '{}' (erasure_profile={})"
  344. .format(pool.name, erasure_profile), level=INFO)
  345. pool.create()
  346. # Set a quota if requested
  347. if quota is not None:
  348. set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
  349. def handle_replicated_pool(request, service):
  350. """Create a new replicated pool.
  351. :param request: dict of request operations and params.
  352. :param service: The ceph client to run the command under.
  353. :returns: dict. exit-code and reason if not 0.
  354. """
  355. pool_name = request.get('name')
  356. replicas = request.get('replicas')
  357. quota = request.get('max-bytes')
  358. weight = request.get('weight')
  359. group_name = request.get('group')
  360. # Optional params
  361. pg_num = request.get('pg_num')
  362. if pg_num:
  363. # Cap pg_num to max allowed just in case.
  364. osds = get_osds(service)
  365. if osds:
  366. pg_num = min(pg_num, (len(osds) * 100 // replicas))
  367. app_name = request.get('app-name')
  368. # Check for missing params
  369. if pool_name is None or replicas is None:
  370. msg = "Missing parameter. name and replicas are required"
  371. log(msg, level=ERROR)
  372. return {'exit-code': 1, 'stderr': msg}
  373. if group_name:
  374. group_namespace = request.get('group-namespace')
  375. # Add the pool to the group named "group_name"
  376. add_pool_to_group(pool=pool_name,
  377. group=group_name,
  378. namespace=group_namespace)
  379. kwargs = {}
  380. if pg_num:
  381. kwargs['pg_num'] = pg_num
  382. if weight:
  383. kwargs['percent_data'] = weight
  384. if replicas:
  385. kwargs['replicas'] = replicas
  386. if app_name:
  387. kwargs['app_name'] = app_name
  388. pool = ReplicatedPool(service=service,
  389. name=pool_name, **kwargs)
  390. if not pool_exists(service=service, name=pool_name):
  391. log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
  392. level=INFO)
  393. pool.create()
  394. else:
  395. log("Pool '{}' already exists - skipping create".format(pool.name),
  396. level=DEBUG)
  397. # Set a quota if requested
  398. if quota is not None:
  399. set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
  400. def handle_create_cache_tier(request, service):
  401. """Create a cache tier on a cold pool. Modes supported are
  402. "writeback" and "readonly".
  403. :param request: dict of request operations and params
  404. :param service: The ceph client to run the command under.
  405. :returns: dict. exit-code and reason if not 0
  406. """
  407. # mode = "writeback" | "readonly"
  408. storage_pool = request.get('cold-pool')
  409. cache_pool = request.get('hot-pool')
  410. cache_mode = request.get('mode')
  411. if cache_mode is None:
  412. cache_mode = "writeback"
  413. # cache and storage pool must exist first
  414. if not pool_exists(service=service, name=storage_pool) or not pool_exists(
  415. service=service, name=cache_pool):
  416. msg = ("cold-pool: {} and hot-pool: {} must exist. Please create "
  417. "them first".format(storage_pool, cache_pool))
  418. log(msg, level=ERROR)
  419. return {'exit-code': 1, 'stderr': msg}
  420. p = Pool(service=service, name=storage_pool)
  421. p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
  422. def handle_remove_cache_tier(request, service):
  423. """Remove a cache tier from the cold pool.
  424. :param request: dict of request operations and params
  425. :param service: The ceph client to run the command under.
  426. :returns: dict. exit-code and reason if not 0
  427. """
  428. storage_pool = request.get('cold-pool')
  429. cache_pool = request.get('hot-pool')
  430. # cache and storage pool must exist first
  431. if not pool_exists(service=service, name=storage_pool) or not pool_exists(
  432. service=service, name=cache_pool):
  433. msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not "
  434. "deleting cache tier".format(storage_pool, cache_pool))
  435. log(msg, level=ERROR)
  436. return {'exit-code': 1, 'stderr': msg}
  437. pool = Pool(name=storage_pool, service=service)
  438. pool.remove_cache_tier(cache_pool=cache_pool)
  439. def handle_set_pool_value(request, service):
  440. """Sets an arbitrary pool value.
  441. :param request: dict of request operations and params
  442. :param service: The ceph client to run the command under.
  443. :returns: dict. exit-code and reason if not 0
  444. """
  445. # Set arbitrary pool values
  446. params = {'pool': request.get('name'),
  447. 'key': request.get('key'),
  448. 'value': request.get('value')}
  449. if params['key'] not in POOL_KEYS:
  450. msg = "Invalid key '{}'".format(params['key'])
  451. log(msg, level=ERROR)
  452. return {'exit-code': 1, 'stderr': msg}
  453. # Get the validation method
  454. validator_params = POOL_KEYS[params['key']]
  455. if len(validator_params) is 1:
  456. # Validate that what the user passed is actually legal per Ceph's rules
  457. validator(params['value'], validator_params[0])
  458. else:
  459. # Validate that what the user passed is actually legal per Ceph's rules
  460. validator(params['value'], validator_params[0], validator_params[1])
  461. # Set the value
  462. pool_set(service=service, pool_name=params['pool'], key=params['key'],
  463. value=params['value'])
  464. def handle_rgw_regionmap_update(request, service):
  465. """Change the radosgw region map.
  466. :param request: dict of request operations and params
  467. :param service: The ceph client to run the command under.
  468. :returns: dict. exit-code and reason if not 0
  469. """
  470. name = request.get('client-name')
  471. if not name:
  472. msg = "Missing rgw-region or client-name params"
  473. log(msg, level=ERROR)
  474. return {'exit-code': 1, 'stderr': msg}
  475. try:
  476. check_output(['radosgw-admin',
  477. '--id', service,
  478. 'regionmap', 'update', '--name', name])
  479. except CalledProcessError as err:
  480. log(err.output, level=ERROR)
  481. return {'exit-code': 1, 'stderr': err.output}
  482. def handle_rgw_regionmap_default(request, service):
  483. """Create a radosgw region map.
  484. :param request: dict of request operations and params
  485. :param service: The ceph client to run the command under.
  486. :returns: dict. exit-code and reason if not 0
  487. """
  488. region = request.get('rgw-region')
  489. name = request.get('client-name')
  490. if not region or not name:
  491. msg = "Missing rgw-region or client-name params"
  492. log(msg, level=ERROR)
  493. return {'exit-code': 1, 'stderr': msg}
  494. try:
  495. check_output(
  496. [
  497. 'radosgw-admin',
  498. '--id', service,
  499. 'regionmap',
  500. 'default',
  501. '--rgw-region', region,
  502. '--name', name])
  503. except CalledProcessError as err:
  504. log(err.output, level=ERROR)
  505. return {'exit-code': 1, 'stderr': err.output}
  506. def handle_rgw_zone_set(request, service):
  507. """Create a radosgw zone.
  508. :param request: dict of request operations and params
  509. :param service: The ceph client to run the command under.
  510. :returns: dict. exit-code and reason if not 0
  511. """
  512. json_file = request.get('zone-json')
  513. name = request.get('client-name')
  514. region_name = request.get('region-name')
  515. zone_name = request.get('zone-name')
  516. if not json_file or not name or not region_name or not zone_name:
  517. msg = "Missing json-file or client-name params"
  518. log(msg, level=ERROR)
  519. return {'exit-code': 1, 'stderr': msg}
  520. infile = NamedTemporaryFile(delete=False)
  521. with open(infile.name, 'w') as infile_handle:
  522. infile_handle.write(json_file)
  523. try:
  524. check_output(
  525. [
  526. 'radosgw-admin',
  527. '--id', service,
  528. 'zone',
  529. 'set',
  530. '--rgw-zone', zone_name,
  531. '--infile', infile.name,
  532. '--name', name,
  533. ]
  534. )
  535. except CalledProcessError as err:
  536. log(err.output, level=ERROR)
  537. return {'exit-code': 1, 'stderr': err.output}
  538. os.unlink(infile.name)
  539. def handle_put_osd_in_bucket(request, service):
  540. """Move an osd into a specified crush bucket.
  541. :param request: dict of request operations and params
  542. :param service: The ceph client to run the command under.
  543. :returns: dict. exit-code and reason if not 0
  544. """
  545. osd_id = request.get('osd')
  546. target_bucket = request.get('bucket')
  547. if not osd_id or not target_bucket:
  548. msg = "Missing OSD ID or Bucket"
  549. log(msg, level=ERROR)
  550. return {'exit-code': 1, 'stderr': msg}
  551. crushmap = Crushmap()
  552. try:
  553. crushmap.ensure_bucket_is_present(target_bucket)
  554. check_output(
  555. [
  556. 'ceph',
  557. '--id', service,
  558. 'osd',
  559. 'crush',
  560. 'set',
  561. str(osd_id),
  562. str(get_osd_weight(osd_id)),
  563. "root={}".format(target_bucket)
  564. ]
  565. )
  566. except Exception as exc:
  567. msg = "Failed to move OSD " \
  568. "{} into Bucket {} :: {}".format(osd_id, target_bucket, exc)
  569. log(msg, level=ERROR)
  570. return {'exit-code': 1, 'stderr': msg}
  571. def handle_rgw_create_user(request, service):
  572. """Create a new rados gateway user.
  573. :param request: dict of request operations and params
  574. :param service: The ceph client to run the command under.
  575. :returns: dict. exit-code and reason if not 0
  576. """
  577. user_id = request.get('rgw-uid')
  578. display_name = request.get('display-name')
  579. name = request.get('client-name')
  580. if not name or not display_name or not user_id:
  581. msg = "Missing client-name, display-name or rgw-uid"
  582. log(msg, level=ERROR)
  583. return {'exit-code': 1, 'stderr': msg}
  584. try:
  585. create_output = check_output(
  586. [
  587. 'radosgw-admin',
  588. '--id', service,
  589. 'user',
  590. 'create',
  591. '--uid', user_id,
  592. '--display-name', display_name,
  593. '--name', name,
  594. '--system'
  595. ]
  596. )
  597. try:
  598. user_json = json.loads(str(create_output.decode('UTF-8')))
  599. return {'exit-code': 0, 'user': user_json}
  600. except ValueError as err:
  601. log(err, level=ERROR)
  602. return {'exit-code': 1, 'stderr': err}
  603. except CalledProcessError as err:
  604. log(err.output, level=ERROR)
  605. return {'exit-code': 1, 'stderr': err.output}
  606. def handle_create_cephfs(request, service):
  607. """Create a new cephfs.
  608. :param request: The broker request
  609. :param service: The ceph client to run the command under.
  610. :returns: dict. exit-code and reason if not 0
  611. """
  612. cephfs_name = request.get('mds_name')
  613. data_pool = request.get('data_pool')
  614. metadata_pool = request.get('metadata_pool')
  615. # Check if the user params were provided
  616. if not cephfs_name or not data_pool or not metadata_pool:
  617. msg = "Missing mds_name, data_pool or metadata_pool params"
  618. log(msg, level=ERROR)
  619. return {'exit-code': 1, 'stderr': msg}
  620. # Sanity check that the required pools exist
  621. if not pool_exists(service=service, name=data_pool):
  622. msg = "CephFS data pool does not exist. Cannot create CephFS"
  623. log(msg, level=ERROR)
  624. return {'exit-code': 1, 'stderr': msg}
  625. if not pool_exists(service=service, name=metadata_pool):
  626. msg = "CephFS metadata pool does not exist. Cannot create CephFS"
  627. log(msg, level=ERROR)
  628. return {'exit-code': 1, 'stderr': msg}
  629. if get_cephfs(service=service):
  630. # CephFS new has already been called
  631. log("CephFS already created")
  632. return
  633. # Finally create CephFS
  634. try:
  635. check_output(["ceph",
  636. '--id', service,
  637. "fs", "new", cephfs_name,
  638. metadata_pool,
  639. data_pool])
  640. except CalledProcessError as err:
  641. if err.returncode == 22:
  642. log("CephFS already created")
  643. return
  644. else:
  645. log(err.output, level=ERROR)
  646. return {'exit-code': 1, 'stderr': err.output}
  647. def handle_rgw_region_set(request, service):
  648. # radosgw-admin region set --infile us.json --name client.radosgw.us-east-1
  649. """Set the rados gateway region.
  650. :param request: dict. The broker request.
  651. :param service: The ceph client to run the command under.
  652. :returns: dict. exit-code and reason if not 0
  653. """
  654. json_file = request.get('region-json')
  655. name = request.get('client-name')
  656. region_name = request.get('region-name')
  657. zone_name = request.get('zone-name')
  658. if not json_file or not name or not region_name or not zone_name:
  659. msg = "Missing json-file or client-name params"
  660. log(msg, level=ERROR)
  661. return {'exit-code': 1, 'stderr': msg}
  662. infile = NamedTemporaryFile(delete=False)
  663. with open(infile.name, 'w') as infile_handle:
  664. infile_handle.write(json_file)
  665. try:
  666. check_output(
  667. [
  668. 'radosgw-admin',
  669. '--id', service,
  670. 'region',
  671. 'set',
  672. '--rgw-zone', zone_name,
  673. '--infile', infile.name,
  674. '--name', name,
  675. ]
  676. )
  677. except CalledProcessError as err:
  678. log(err.output, level=ERROR)
  679. return {'exit-code': 1, 'stderr': err.output}
  680. os.unlink(infile.name)
  681. def process_requests_v1(reqs):
  682. """Process v1 requests.
  683. Takes a list of requests (dicts) and processes each one. If an error is
  684. found, processing stops and the client is notified in the response.
  685. Returns a response dict containing the exit code (non-zero if any
  686. operation failed along with an explanation).
  687. """
  688. ret = None
  689. log("Processing {} ceph broker requests".format(len(reqs)), level=INFO)
  690. for req in reqs:
  691. op = req.get('op')
  692. log("Processing op='{}'".format(op), level=DEBUG)
  693. # Use admin client since we do not have other client key locations
  694. # setup to use them for these operations.
  695. svc = 'admin'
  696. if op == "create-pool":
  697. pool_type = req.get('pool-type') # "replicated" | "erasure"
  698. # Default to replicated if pool_type isn't given
  699. if pool_type == 'erasure':
  700. ret = handle_erasure_pool(request=req, service=svc)
  701. else:
  702. ret = handle_replicated_pool(request=req, service=svc)
  703. elif op == "create-cephfs":
  704. ret = handle_create_cephfs(request=req, service=svc)
  705. elif op == "create-cache-tier":
  706. ret = handle_create_cache_tier(request=req, service=svc)
  707. elif op == "remove-cache-tier":
  708. ret = handle_remove_cache_tier(request=req, service=svc)
  709. elif op == "create-erasure-profile":
  710. ret = handle_create_erasure_profile(request=req, service=svc)
  711. elif op == "delete-pool":
  712. pool = req.get('name')
  713. ret = delete_pool(service=svc, name=pool)
  714. elif op == "rename-pool":
  715. old_name = req.get('name')
  716. new_name = req.get('new-name')
  717. ret = rename_pool(service=svc, old_name=old_name,
  718. new_name=new_name)
  719. elif op == "snapshot-pool":
  720. pool = req.get('name')
  721. snapshot_name = req.get('snapshot-name')
  722. ret = snapshot_pool(service=svc, pool_name=pool,
  723. snapshot_name=snapshot_name)
  724. elif op == "remove-pool-snapshot":
  725. pool = req.get('name')
  726. snapshot_name = req.get('snapshot-name')
  727. ret = remove_pool_snapshot(service=svc, pool_name=pool,
  728. snapshot_name=snapshot_name)
  729. elif op == "set-pool-value":
  730. ret = handle_set_pool_value(request=req, service=svc)
  731. elif op == "rgw-region-set":
  732. ret = handle_rgw_region_set(request=req, service=svc)
  733. elif op == "rgw-zone-set":
  734. ret = handle_rgw_zone_set(request=req, service=svc)
  735. elif op == "rgw-regionmap-update":
  736. ret = handle_rgw_regionmap_update(request=req, service=svc)
  737. elif op == "rgw-regionmap-default":
  738. ret = handle_rgw_regionmap_default(request=req, service=svc)
  739. elif op == "rgw-create-user":
  740. ret = handle_rgw_create_user(request=req, service=svc)
  741. elif op == "move-osd-to-bucket":
  742. ret = handle_put_osd_in_bucket(request=req, service=svc)
  743. elif op == "add-permissions-to-key":
  744. ret = handle_add_permissions_to_key(request=req, service=svc)
  745. else:
  746. msg = "Unknown operation '{}'".format(op)
  747. log(msg, level=ERROR)
  748. return {'exit-code': 1, 'stderr': msg}
  749. if type(ret) == dict and 'exit-code' in ret:
  750. return ret
  751. return {'exit-code': 0}