From 2ecb4f641aa2e6a7e7e13af8edb7c58bcebe8e3d Mon Sep 17 00:00:00 2001 From: Chris MacNaughton Date: Tue, 6 Dec 2016 14:34:42 -0500 Subject: [PATCH] Add broker request to move Disk This allows a remote charm, ideally an OSD unit, to request that the monitor move a disk to a given bucket in the Crushmap Change-Id: I77d8159d588f3b31d6eb1c25cd5ca16e53c6f01d --- ceph/__init__.py | 27 ++++++++++++++ ceph/ceph_broker.py | 38 ++++++++++++++++++- ceph/ceph_helpers.py | 5 +++ unit_tests/test_ceph.py | 67 ++++++++++++++++++++++++++++++++++ unit_tests/test_ceph_broker.py | 30 +++++++++++++++ 5 files changed, 166 insertions(+), 1 deletion(-) diff --git a/ceph/__init__.py b/ceph/__init__.py index 94fd0a1..7f80b2c 100644 --- a/ceph/__init__.py +++ b/ceph/__init__.py @@ -455,6 +455,33 @@ class CrushLocation(object): return self.name < other.name +def get_osd_weight(osd_id): + """ + Returns the weight of the specified OSD + :return: Float :raise: ValueError if the monmap fails to parse. + Also raises CalledProcessError if our ceph command fails + """ + try: + tree = subprocess.check_output( + ['ceph', 'osd', 'tree', '--format=json']) + try: + json_tree = json.loads(tree) + # Make sure children are present in the json + if not json_tree['nodes']: + return None + for device in json_tree['nodes']: + if device['type'] == 'osd' and device['name'] == osd_id: + return device['crush_weight'] + except ValueError as v: + log("Unable to parse ceph tree json: {}. Error: {}".format( + tree, v.message)) + raise + except subprocess.CalledProcessError as e: + log("ceph osd tree command failed with message: {}".format( + e.message)) + raise + + def get_osd_tree(service): """ Returns the current osd map in JSON. diff --git a/ceph/ceph_broker.py b/ceph/ceph_broker.py index 0892961..52a55c8 100644 --- a/ceph/ceph_broker.py +++ b/ceph/ceph_broker.py @@ -24,7 +24,11 @@ from charmhelpers.core.hookenv import ( INFO, ERROR, ) -from ceph import get_cephfs +from ceph import ( + get_cephfs, + get_osd_weight +) +from ceph_helpers import Crushmap from charmhelpers.contrib.storage.linux.ceph import ( create_erasure_profile, delete_pool, @@ -360,6 +364,36 @@ def handle_rgw_zone_set(request, service): os.unlink(infile.name) +def handle_put_osd_in_bucket(request, service): + osd_id = request.get('osd') + target_bucket = request.get('bucket') + if not osd_id or not target_bucket: + msg = "Missing OSD ID or Bucket" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + crushmap = Crushmap() + try: + crushmap.ensure_bucket_is_present(target_bucket) + check_output( + [ + 'ceph', + '--id', service, + 'osd', + 'crush', + 'set', + osd_id, + get_osd_weight(osd_id), + "root={}".format(target_bucket) + ] + ) + + except Exception as exc: + msg = "Failed to move OSD " \ + "{} into Bucket {} :: {}".format(osd_id, target_bucket, exc) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + def handle_rgw_create_user(request, service): user_id = request.get('rgw-uid') display_name = request.get('display-name') @@ -534,6 +568,8 @@ def process_requests_v1(reqs): ret = handle_rgw_regionmap_default(request=req, service=svc) elif op == "rgw-create-user": ret = handle_rgw_create_user(request=req, service=svc) + elif op == "move-osd-to-bucket": + ret = handle_put_osd_in_bucket(request=req, service=svc) else: msg = "Unknown operation '%s'" % op log(msg, level=ERROR) diff --git a/ceph/ceph_helpers.py b/ceph/ceph_helpers.py index 60cc73f..b73300b 100644 --- a/ceph/ceph_helpers.py +++ b/ceph/ceph_helpers.py @@ -193,6 +193,11 @@ class Crushmap(object): log("load_crushmap error: {}".format(e)) raise "Failed to read Crushmap" + def ensure_bucket_is_present(self, bucket_name): + if bucket_name not in [bucket.name() for bucket in self.buckets()]: + self.add_bucket(bucket_name) + self.save() + def buckets(self): """Return a list of buckets that are in the Crushmap.""" return self._buckets diff --git a/unit_tests/test_ceph.py b/unit_tests/test_ceph.py index 9bb913e..9d67a2d 100644 --- a/unit_tests/test_ceph.py +++ b/unit_tests/test_ceph.py @@ -37,6 +37,73 @@ class CephTestCase(unittest.TestCase): def setUp(self): super(CephTestCase, self).setUp() + @mock.patch('subprocess.check_output') + def test_get_osd_weight(self, output): + """It gives an OSD's weight""" + output.return_value = """{ + "nodes": [{ + "id": -1, + "name": "default", + "type": "root", + "type_id": 10, + "children": [-4, -3, -2] + }, { + "id": -2, + "name": "ip-172-31-11-147", + "type": "host", + "type_id": 1, + "children": [0] + }, { + "id": 0, + "name": "osd.0", + "type": "osd", + "type_id": 0, + "crush_weight": 0.002899, + "depth": 2, + "exists": 1, + "status": "up", + "reweight": 1.000000, + "primary_affinity": 1.000000 + }, { + "id": -3, + "name": "ip-172-31-56-198", + "type": "host", + "type_id": 1, + "children": [2] + }, { + "id": 2, + "name": "osd.2", + "type": "osd", + "type_id": 0, + "crush_weight": 0.002899, + "depth": 2, + "exists": 1, + "status": "up", + "reweight": 1.000000, + "primary_affinity": 1.000000 + }, { + "id": -4, + "name": "ip-172-31-24-103", + "type": "host", + "type_id": 1, + "children": [1] + }, { + "id": 1, + "name": "osd.1", + "type": "osd", + "type_id": 0, + "crush_weight": 0.002899, + "depth": 2, + "exists": 1, + "status": "up", + "reweight": 1.000000, + "primary_affinity": 1.000000 + }], + "stray": [] +}""" + weight = ceph.get_osd_weight('osd.0') + self.assertEqual(weight, 0.002899) + def test_get_named_key_with_pool(self): with mock.patch.object(ceph, "ceph_user", return_value="ceph"): with mock.patch.object(ceph.subprocess, "check_output") \ diff --git a/unit_tests/test_ceph_broker.py b/unit_tests/test_ceph_broker.py index aa00388..5252a2a 100644 --- a/unit_tests/test_ceph_broker.py +++ b/unit_tests/test_ceph_broker.py @@ -146,6 +146,36 @@ class CephBrokerTestCase(unittest.TestCase): self.assertEqual(json.loads(rc)['exit-code'], 0) self.assertEqual(json.loads(rc)['request-id'], '1ef5aede') + @mock.patch('ceph_broker.check_output') + @mock.patch('ceph_helpers.Crushmap.load_crushmap') + @mock.patch('ceph_helpers.Crushmap.ensure_bucket_is_present') + @mock.patch('ceph_broker.get_osd_weight') + @mock.patch('ceph_broker.log') + def test_process_requests_move_osd(self, + mock_log, + get_osd_weight, + ensure_bucket_is_present, + load_crushmap, + check_output): + load_crushmap.return_value = "" + ensure_bucket_is_present.return_value = None + get_osd_weight.return_value = 1 + reqs = json.dumps({'api-version': 1, + 'request-id': '1ef5aede', + 'ops': [{ + 'op': 'move-osd-to-bucket', + 'osd': 'osd.0', + 'bucket': 'test' + }]}) + rc = ceph_broker.process_requests(reqs) + check_output.assert_called_with(["ceph", + '--id', 'admin', + "osd", "crush", "set", + u"osd.0", 1, "root=test"]) + + self.assertEqual(json.loads(rc)['exit-code'], 0) + self.assertEqual(json.loads(rc)['request-id'], '1ef5aede') + @mock.patch('ceph_broker.log') def test_process_requests_invalid_api_rid(self, mock_log): reqs = json.dumps({'api-version': 0, 'request-id': '1ef5aede',