Add broker request to move Disk

This allows a remote charm, ideally an OSD unit,
to request that the monitor move a disk to a given
bucket in the Crushmap

Change-Id: I77d8159d588f3b31d6eb1c25cd5ca16e53c6f01d
This commit is contained in:
Chris MacNaughton 2016-12-06 14:34:42 -05:00
parent c04b57a79f
commit 2ecb4f641a
5 changed files with 166 additions and 1 deletions

View File

@ -455,6 +455,33 @@ class CrushLocation(object):
return self.name < other.name
def get_osd_weight(osd_id):
"""
Returns the weight of the specified OSD
:return: Float :raise: ValueError if the monmap fails to parse.
Also raises CalledProcessError if our ceph command fails
"""
try:
tree = subprocess.check_output(
['ceph', 'osd', 'tree', '--format=json'])
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
if not json_tree['nodes']:
return None
for device in json_tree['nodes']:
if device['type'] == 'osd' and device['name'] == osd_id:
return device['crush_weight']
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v.message))
raise
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e.message))
raise
def get_osd_tree(service):
"""
Returns the current osd map in JSON.

View File

@ -24,7 +24,11 @@ from charmhelpers.core.hookenv import (
INFO,
ERROR,
)
from ceph import get_cephfs
from ceph import (
get_cephfs,
get_osd_weight
)
from ceph_helpers import Crushmap
from charmhelpers.contrib.storage.linux.ceph import (
create_erasure_profile,
delete_pool,
@ -360,6 +364,36 @@ def handle_rgw_zone_set(request, service):
os.unlink(infile.name)
def handle_put_osd_in_bucket(request, service):
osd_id = request.get('osd')
target_bucket = request.get('bucket')
if not osd_id or not target_bucket:
msg = "Missing OSD ID or Bucket"
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
crushmap = Crushmap()
try:
crushmap.ensure_bucket_is_present(target_bucket)
check_output(
[
'ceph',
'--id', service,
'osd',
'crush',
'set',
osd_id,
get_osd_weight(osd_id),
"root={}".format(target_bucket)
]
)
except Exception as exc:
msg = "Failed to move OSD " \
"{} into Bucket {} :: {}".format(osd_id, target_bucket, exc)
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
def handle_rgw_create_user(request, service):
user_id = request.get('rgw-uid')
display_name = request.get('display-name')
@ -534,6 +568,8 @@ def process_requests_v1(reqs):
ret = handle_rgw_regionmap_default(request=req, service=svc)
elif op == "rgw-create-user":
ret = handle_rgw_create_user(request=req, service=svc)
elif op == "move-osd-to-bucket":
ret = handle_put_osd_in_bucket(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
log(msg, level=ERROR)

View File

@ -193,6 +193,11 @@ class Crushmap(object):
log("load_crushmap error: {}".format(e))
raise "Failed to read Crushmap"
def ensure_bucket_is_present(self, bucket_name):
if bucket_name not in [bucket.name() for bucket in self.buckets()]:
self.add_bucket(bucket_name)
self.save()
def buckets(self):
"""Return a list of buckets that are in the Crushmap."""
return self._buckets

View File

@ -37,6 +37,73 @@ class CephTestCase(unittest.TestCase):
def setUp(self):
super(CephTestCase, self).setUp()
@mock.patch('subprocess.check_output')
def test_get_osd_weight(self, output):
"""It gives an OSD's weight"""
output.return_value = """{
"nodes": [{
"id": -1,
"name": "default",
"type": "root",
"type_id": 10,
"children": [-4, -3, -2]
}, {
"id": -2,
"name": "ip-172-31-11-147",
"type": "host",
"type_id": 1,
"children": [0]
}, {
"id": 0,
"name": "osd.0",
"type": "osd",
"type_id": 0,
"crush_weight": 0.002899,
"depth": 2,
"exists": 1,
"status": "up",
"reweight": 1.000000,
"primary_affinity": 1.000000
}, {
"id": -3,
"name": "ip-172-31-56-198",
"type": "host",
"type_id": 1,
"children": [2]
}, {
"id": 2,
"name": "osd.2",
"type": "osd",
"type_id": 0,
"crush_weight": 0.002899,
"depth": 2,
"exists": 1,
"status": "up",
"reweight": 1.000000,
"primary_affinity": 1.000000
}, {
"id": -4,
"name": "ip-172-31-24-103",
"type": "host",
"type_id": 1,
"children": [1]
}, {
"id": 1,
"name": "osd.1",
"type": "osd",
"type_id": 0,
"crush_weight": 0.002899,
"depth": 2,
"exists": 1,
"status": "up",
"reweight": 1.000000,
"primary_affinity": 1.000000
}],
"stray": []
}"""
weight = ceph.get_osd_weight('osd.0')
self.assertEqual(weight, 0.002899)
def test_get_named_key_with_pool(self):
with mock.patch.object(ceph, "ceph_user", return_value="ceph"):
with mock.patch.object(ceph.subprocess, "check_output") \

View File

@ -146,6 +146,36 @@ class CephBrokerTestCase(unittest.TestCase):
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
@mock.patch('ceph_broker.check_output')
@mock.patch('ceph_helpers.Crushmap.load_crushmap')
@mock.patch('ceph_helpers.Crushmap.ensure_bucket_is_present')
@mock.patch('ceph_broker.get_osd_weight')
@mock.patch('ceph_broker.log')
def test_process_requests_move_osd(self,
mock_log,
get_osd_weight,
ensure_bucket_is_present,
load_crushmap,
check_output):
load_crushmap.return_value = ""
ensure_bucket_is_present.return_value = None
get_osd_weight.return_value = 1
reqs = json.dumps({'api-version': 1,
'request-id': '1ef5aede',
'ops': [{
'op': 'move-osd-to-bucket',
'osd': 'osd.0',
'bucket': 'test'
}]})
rc = ceph_broker.process_requests(reqs)
check_output.assert_called_with(["ceph",
'--id', 'admin',
"osd", "crush", "set",
u"osd.0", 1, "root=test"])
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
@mock.patch('ceph_broker.log')
def test_process_requests_invalid_api_rid(self, mock_log):
reqs = json.dumps({'api-version': 0, 'request-id': '1ef5aede',