Sync in charms.ceph

This brings in the new broker change to restrict
key access by groups

Change-Id: I19ad0142b4227ba555a0794e8b938372d9fdb84c
Partial-Bug: 1424771
This commit is contained in:
Chris MacNaughton 2017-02-10 07:54:14 -05:00
parent 91a8a01056
commit 3dfeff7a19
2 changed files with 244 additions and 21 deletions

View File

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from _ctypes import POINTER, byref
import ctypes import ctypes
import collections import collections
import json import json
@ -309,22 +310,52 @@ def set_hdd_read_ahead(dev_name, read_ahead_sectors=256):
def get_block_uuid(block_dev): def get_block_uuid(block_dev):
""" """
This queries blkid to get the uuid for a block device. This queries blkid to get the uuid for a block device. Note: This function
needs to be called with root priv. It will raise an error otherwise.
:param block_dev: Name of the block device to query. :param block_dev: Name of the block device to query.
:return: The UUID of the device or None on Error. :return: The UUID of the device or None on Error. Raises OSError
""" """
try: try:
block_info = subprocess.check_output( blkid = ctypes.cdll.LoadLibrary("libblkid.so")
['blkid', '-o', 'export', block_dev]) # Header signature
for tag in block_info.split('\n'): # extern int blkid_probe_lookup_value(blkid_probe pr, const char *name,
parts = tag.split('=') # const char **data, size_t *len);
if parts[0] == 'UUID': blkid.blkid_new_probe_from_filename.argtypes = [ctypes.c_char_p]
return parts[1] blkid.blkid_probe_lookup_value.argtypes = [ctypes.c_void_p,
return None ctypes.c_char_p,
except subprocess.CalledProcessError as err: POINTER(ctypes.c_char_p),
log('get_block_uuid failed with error: {}'.format(err.output), POINTER(ctypes.c_ulong)]
except OSError as err:
log('get_block_uuid loading libblkid.so failed with error: {}'.format(
os.strerror(err.errno)),
level=ERROR) level=ERROR)
raise err
if not os.path.exists(block_dev):
return None return None
probe = blkid.blkid_new_probe_from_filename(ctypes.c_char_p(block_dev))
if probe < 0:
log('get_block_uuid new_probe_from_filename failed: {}'.format(
os.strerror(probe)),
level=ERROR)
raise OSError(probe, os.strerror(probe))
result = blkid.blkid_do_probe(probe)
if result != 0:
log('get_block_uuid do_probe failed with error: {}'.format(
os.strerror(result)),
level=ERROR)
raise OSError(result, os.strerror(result))
uuid = ctypes.c_char_p()
result = blkid.blkid_probe_lookup_value(probe,
ctypes.c_char_p(
'UUID'.encode('ascii')),
byref(uuid), None)
if result < 0:
log('get_block_uuid lookup_value failed with error: {}'.format(
os.strerror(result)),
level=ERROR)
raise OSError(result, os.strerror(result))
blkid.blkid_free_probe(probe)
return ctypes.string_at(uuid).decode('ascii')
def check_max_sectors(save_settings_dict, def check_max_sectors(save_settings_dict,
@ -390,6 +421,7 @@ def tune_dev(block_dev):
if uuid is None: if uuid is None:
log('block device {} uuid is None. Unable to save to ' log('block device {} uuid is None. Unable to save to '
'hdparm.conf'.format(block_dev), level=DEBUG) 'hdparm.conf'.format(block_dev), level=DEBUG)
return
save_settings_dict = {} save_settings_dict = {}
log('Tuning device {}'.format(block_dev)) log('Tuning device {}'.format(block_dev))
status_set('maintenance', 'Tuning device {}'.format(block_dev)) status_set('maintenance', 'Tuning device {}'.format(block_dev))
@ -1430,10 +1462,17 @@ def upgrade_monitor(new_version):
service_stop('ceph-mon-all') service_stop('ceph-mon-all')
apt_install(packages=PACKAGES, fatal=True) apt_install(packages=PACKAGES, fatal=True)
# Ensure the ownership of Ceph's directories is correct # Ensure the files and directories under /var/lib/ceph is chowned
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), # properly as part of the move to the Jewel release, which moved the
owner=ceph_user(), # ceph daemons to running as ceph:ceph instead of root:root.
group=ceph_user()) if new_version == 'jewel':
# Ensure the ownership of Ceph's directories is correct
owner = ceph_user()
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"),
owner=owner,
group=owner,
follow_links=True)
if systemd(): if systemd():
for mon_id in get_local_mon_ids(): for mon_id in get_local_mon_ids():
service_start('ceph-mon@{}'.format(mon_id)) service_start('ceph-mon@{}'.format(mon_id))
@ -1608,10 +1647,18 @@ def upgrade_osd(new_version):
service_stop('ceph-osd-all') service_stop('ceph-osd-all')
apt_install(packages=PACKAGES, fatal=True) apt_install(packages=PACKAGES, fatal=True)
# Ensure the ownership of Ceph's directories is correct # Ensure the files and directories under /var/lib/ceph is chowned
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), # properly as part of the move to the Jewel release, which moved the
owner=ceph_user(), # ceph daemons to running as ceph:ceph instead of root:root. Only do
group=ceph_user()) # it when necessary as this is an expensive operation to run.
if new_version == 'jewel':
owner = ceph_user()
status_set('maintenance', 'Updating file ownership for OSDs')
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"),
owner=owner,
group=owner,
follow_links=True)
if systemd(): if systemd():
for osd_id in get_local_osd_ids(): for osd_id in get_local_osd_ids():
service_start('ceph-osd@{}'.format(osd_id)) service_start('ceph-osd@{}'.format(osd_id))
@ -1642,7 +1689,6 @@ def list_pools(service):
log("rados lspools failed with error: {}".format(err.output)) log("rados lspools failed with error: {}".format(err.output))
raise raise
# A dict of valid ceph upgrade paths. Mapping is old -> new # A dict of valid ceph upgrade paths. Mapping is old -> new
UPGRADE_PATHS = { UPGRADE_PATHS = {
'firefly': 'hammer', 'firefly': 'hammer',

View File

@ -34,6 +34,8 @@ from charmhelpers.contrib.storage.linux.ceph import (
delete_pool, delete_pool,
erasure_profile_exists, erasure_profile_exists,
get_osds, get_osds,
monitor_key_get,
monitor_key_set,
pool_exists, pool_exists,
pool_set, pool_set,
remove_pool_snapshot, remove_pool_snapshot,
@ -49,7 +51,7 @@ from charmhelpers.contrib.storage.linux.ceph import (
# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/ # This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
# This should do a decent job of preventing people from passing in bad values. # This should do a decent job of preventing people from passing in bad values.
# It will give a useful error message # It will give a useful error message
from subprocess import check_output, CalledProcessError from subprocess import check_call, check_output, CalledProcessError
POOL_KEYS = { POOL_KEYS = {
# "Ceph Key Name": [Python type, [Valid Range]] # "Ceph Key Name": [Python type, [Valid Range]]
@ -157,11 +159,169 @@ def handle_create_erasure_profile(request, service):
data_chunks=k, coding_chunks=m, locality=l) data_chunks=k, coding_chunks=m, locality=l)
def handle_add_permissions_to_key(request, service):
"""
Groups are defined by the key cephx.groups.(namespace-)?-(name). This key
will contain a dict serialized to JSON with data about the group, including
pools and members.
A group can optionally have a namespace defined that will be used to
further restrict pool access.
"""
service_name = request.get('name')
group_name = request.get('group')
group_namespace = request.get('group-namespace')
if group_namespace:
group_name = "{}-{}".format(group_namespace, group_name)
group = get_group(group_name=group_name)
service_obj = get_service_groups(service=service_name,
namespace=group_namespace)
format("Service object: {}".format(service_obj))
permission = request.get('group-permission') or "rwx"
if service_name not in group['services']:
group['services'].append(service_name)
save_group(group=group, group_name=group_name)
if permission not in service_obj['group_names']:
service_obj['group_names'][permission] = []
if group_name not in service_obj['group_names'][permission]:
service_obj['group_names'][permission].append(group_name)
save_service(service=service_obj, service_name=service_name)
service_obj['groups'][group_name] = group
update_service_permissions(service_name, service_obj, group_namespace)
def update_service_permissions(service, service_obj=None, namespace=None):
"""Update the key permissions for the named client in Ceph"""
if not service_obj:
service_obj = get_service_groups(service=service, namespace=namespace)
permissions = pool_permission_list_for_service(service_obj)
call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions
try:
check_call(call)
except CalledProcessError as e:
log("Error updating key capabilities: {}".format(e))
def add_pool_to_group(pool, group, namespace=None):
"""Add a named pool to a named group"""
group_name = group
if namespace:
group_name = "{}-{}".format(namespace, group_name)
group = get_group(group_name=group_name)
group["pools"].append(pool)
save_group(group, group_name=group_name)
for service in group['services']:
update_service_permissions(service, namespace=namespace)
def pool_permission_list_for_service(service):
"""Build the permission string for Ceph for a given service"""
permissions = ""
permission_types = {}
for permission, group in service["group_names"].items():
if permission not in permission_types:
permission_types[permission] = []
for item in group:
permission_types[permission].append(item)
for permission, groups in permission_types.items():
permission = " allow {}".format(permission)
for group in groups:
for pool in service['groups'][group]['pools']:
permission = "{} pool={}".format(permission, pool)
permissions += permission
return ["mon", "allow r", "osd", permissions.strip()]
def get_service_groups(service, namespace=None):
"""
Services are objects stored with some metadata, they look like (for a
service named "nova"):
{
group_names: {'rwx': ['images']},
groups: {}
}
After populating the group, it looks like:
{
group_names: {'rwx': ['images']},
groups: {
1 'images': {
pools: ['glance'],
services: ['nova']
}
}
}
"""
service_json = monitor_key_get(service='admin',
key="cephx.services.{}".format(service))
try:
service = json.loads(service_json)
except TypeError:
service = None
except ValueError:
service = None
if service:
for permission, groups in service['group_names'].items():
for group in groups:
name = group
if namespace:
name = "{}-{}".format(namespace, name)
service['groups'][group] = get_group(group_name=name)
else:
service = {'group_names': {}, 'groups': {}}
return service
def get_group(group_name):
"""
A group is a structure to hold data about a named group, structured as:
{
pools: ['glance'],
services: ['nova']
}
"""
group_key = get_group_key(group_name=group_name)
group_json = monitor_key_get(service='admin', key=group_key)
try:
group = json.loads(group_json)
except TypeError:
group = None
except ValueError:
group = None
if not group:
group = {
'pools': [],
'services': []
}
return group
def save_service(service_name, service):
"""Persist a service in the monitor cluster"""
service['groups'] = {}
return monitor_key_set(service='admin',
key="cephx.services.{}".format(service_name),
value=json.dumps(service))
def save_group(group, group_name):
"""Persist a group in the monitor cluster"""
group_key = get_group_key(group_name=group_name)
return monitor_key_set(service='admin',
key=group_key,
value=json.dumps(group))
def get_group_key(group_name):
"""Build group key"""
return 'cephx.groups.{}'.format(group_name)
def handle_erasure_pool(request, service): def handle_erasure_pool(request, service):
pool_name = request.get('name') pool_name = request.get('name')
erasure_profile = request.get('erasure-profile') erasure_profile = request.get('erasure-profile')
quota = request.get('max-bytes') quota = request.get('max-bytes')
weight = request.get('weight') weight = request.get('weight')
group_name = request.get('group')
if erasure_profile is None: if erasure_profile is None:
erasure_profile = "default-canonical" erasure_profile = "default-canonical"
@ -172,6 +332,13 @@ def handle_erasure_pool(request, service):
log(msg, level=ERROR) log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg} return {'exit-code': 1, 'stderr': msg}
if group_name:
group_namespace = request.get('group-namespace')
# Add the pool to the group named "group_name"
add_pool_to_group(pool=pool_name,
group=group_name,
namespace=group_namespace)
# TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds # TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
if not erasure_profile_exists(service=service, name=erasure_profile): if not erasure_profile_exists(service=service, name=erasure_profile):
# TODO: Fail and tell them to create the profile or default # TODO: Fail and tell them to create the profile or default
@ -200,6 +367,7 @@ def handle_replicated_pool(request, service):
replicas = request.get('replicas') replicas = request.get('replicas')
quota = request.get('max-bytes') quota = request.get('max-bytes')
weight = request.get('weight') weight = request.get('weight')
group_name = request.get('group')
# Optional params # Optional params
pg_num = request.get('pg_num') pg_num = request.get('pg_num')
@ -215,6 +383,13 @@ def handle_replicated_pool(request, service):
log(msg, level=ERROR) log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg} return {'exit-code': 1, 'stderr': msg}
if group_name:
group_namespace = request.get('group-namespace')
# Add the pool to the group named "group_name"
add_pool_to_group(pool=pool_name,
group=group_name,
namespace=group_namespace)
kwargs = {} kwargs = {}
if pg_num: if pg_num:
kwargs['pg_num'] = pg_num kwargs['pg_num'] = pg_num
@ -570,6 +745,8 @@ def process_requests_v1(reqs):
ret = handle_rgw_create_user(request=req, service=svc) ret = handle_rgw_create_user(request=req, service=svc)
elif op == "move-osd-to-bucket": elif op == "move-osd-to-bucket":
ret = handle_put_osd_in_bucket(request=req, service=svc) ret = handle_put_osd_in_bucket(request=req, service=svc)
elif op == "add-permissions-to-key":
ret = handle_add_permissions_to_key(request=req, service=svc)
else: else:
msg = "Unknown operation '%s'" % op msg = "Unknown operation '%s'" % op
log(msg, level=ERROR) log(msg, level=ERROR)