Use charms.ceph for Ceph broker
Drop use of local copy of ceph_broker.py in preference to the centrally maintained copy in charms.ceph. Change-Id: I89aa0f9fc7d5d2d480ebabc1cb17a86dcbef21bf
This commit is contained in:
parent
8cc52727a2
commit
f84e71392c
7
Makefile
7
Makefile
@ -18,3 +18,10 @@ bin/charm_helpers_sync.py:
|
|||||||
|
|
||||||
sync: bin/charm_helpers_sync.py
|
sync: bin/charm_helpers_sync.py
|
||||||
$(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-hooks.yaml
|
$(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-hooks.yaml
|
||||||
|
|
||||||
|
bin/git_sync.py:
|
||||||
|
@mkdir -p bin
|
||||||
|
@wget -O bin/git_sync.py https://raw.githubusercontent.com/CanonicalLtd/git-sync/master/git_sync.py
|
||||||
|
|
||||||
|
ceph-sync: bin/git_sync.py
|
||||||
|
$(PYTHON) bin/git_sync.py -d lib -s https://github.com/openstack/charms.ceph.git
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -16,6 +16,7 @@ import sys
|
|||||||
|
|
||||||
_path = os.path.dirname(os.path.realpath(__file__))
|
_path = os.path.dirname(os.path.realpath(__file__))
|
||||||
_root = os.path.abspath(os.path.join(_path, '..'))
|
_root = os.path.abspath(os.path.join(_path, '..'))
|
||||||
|
_lib = os.path.abspath(os.path.join(_path, '../lib'))
|
||||||
|
|
||||||
|
|
||||||
def _add_path(path):
|
def _add_path(path):
|
||||||
@ -24,6 +25,7 @@ def _add_path(path):
|
|||||||
|
|
||||||
|
|
||||||
_add_path(_root)
|
_add_path(_root)
|
||||||
|
_add_path(_lib)
|
||||||
|
|
||||||
import ceph
|
import ceph
|
||||||
from charmhelpers.core.hookenv import (
|
from charmhelpers.core.hookenv import (
|
||||||
@ -63,7 +65,7 @@ from charmhelpers.contrib.openstack.utils import (
|
|||||||
|
|
||||||
from charmhelpers.core.templating import render
|
from charmhelpers.core.templating import render
|
||||||
|
|
||||||
from ceph_broker import (
|
from charms_ceph.broker import (
|
||||||
process_requests
|
process_requests
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
# Wrapper to deal with newer Ubuntu versions that don't have py2 installed
|
# Wrapper to deal with newer Ubuntu versions that don't have py2 installed
|
||||||
# by default.
|
# by default.
|
||||||
|
|
||||||
declare -a DEPS=('apt' 'netaddr' 'netifaces' 'pip' 'yaml' 'dnspython')
|
declare -a DEPS=('apt' 'netaddr' 'netifaces' 'pip' 'yaml')
|
||||||
|
|
||||||
check_and_install() {
|
check_and_install() {
|
||||||
pkg="${1}-${2}"
|
pkg="${1}-${2}"
|
||||||
@ -17,4 +17,5 @@ for dep in ${DEPS[@]}; do
|
|||||||
check_and_install ${PYTHON} ${dep}
|
check_and_install ${PYTHON} ${dep}
|
||||||
done
|
done
|
||||||
|
|
||||||
|
./hooks/install_deps
|
||||||
exec ./hooks/install.real
|
exec ./hooks/install.real
|
||||||
|
18
hooks/install_deps
Executable file
18
hooks/install_deps
Executable file
@ -0,0 +1,18 @@
|
|||||||
|
#!/bin/bash -e
|
||||||
|
# Wrapper to ensure that python dependencies are installed before we get into
|
||||||
|
# the python part of the hook execution
|
||||||
|
|
||||||
|
declare -a DEPS=('dnspython' 'pyudev')
|
||||||
|
|
||||||
|
check_and_install() {
|
||||||
|
pkg="${1}-${2}"
|
||||||
|
if ! dpkg -s ${pkg} 2>&1 > /dev/null; then
|
||||||
|
apt-get -y install ${pkg}
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
PYTHON="python3"
|
||||||
|
|
||||||
|
for dep in ${DEPS[@]}; do
|
||||||
|
check_and_install ${PYTHON} ${dep}
|
||||||
|
done
|
6
hooks/upgrade-charm
Executable file
6
hooks/upgrade-charm
Executable file
@ -0,0 +1,6 @@
|
|||||||
|
#!/bin/bash -e
|
||||||
|
# Wrapper to ensure that old python bytecode isn't hanging around
|
||||||
|
# after we upgrade the charm with newer libraries
|
||||||
|
rm -rf **/*.pyc
|
||||||
|
|
||||||
|
./hooks/install_deps
|
@ -1,3 +0,0 @@
|
|||||||
This file was created by release-tools to ensure that this empty
|
|
||||||
directory is preserved in vcs re: lint check definitions in global
|
|
||||||
tox.ini files. This file can be removed if/when this dir is actually in use.
|
|
0
lib/charms_ceph/__init__.py
Normal file
0
lib/charms_ceph/__init__.py
Normal file
@ -1,12 +1,29 @@
|
|||||||
#!/usr/bin/python
|
# Copyright 2016 Canonical Ltd
|
||||||
#
|
#
|
||||||
# Copyright 2015 Canonical Ltd.
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
#
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import json
|
import json
|
||||||
import six
|
import os
|
||||||
from subprocess import check_call, CalledProcessError
|
|
||||||
|
|
||||||
|
from subprocess import check_call, check_output, CalledProcessError
|
||||||
|
from tempfile import NamedTemporaryFile
|
||||||
|
|
||||||
|
from charms_ceph.utils import (
|
||||||
|
get_cephfs,
|
||||||
|
get_osd_weight
|
||||||
|
)
|
||||||
|
from charms_ceph.crush_utils import Crushmap
|
||||||
|
|
||||||
from charmhelpers.core.hookenv import (
|
from charmhelpers.core.hookenv import (
|
||||||
log,
|
log,
|
||||||
@ -25,18 +42,17 @@ from charmhelpers.contrib.storage.linux.ceph import (
|
|||||||
pool_set,
|
pool_set,
|
||||||
remove_pool_snapshot,
|
remove_pool_snapshot,
|
||||||
rename_pool,
|
rename_pool,
|
||||||
set_pool_quota,
|
|
||||||
snapshot_pool,
|
snapshot_pool,
|
||||||
validator,
|
validator,
|
||||||
ErasurePool,
|
ErasurePool,
|
||||||
Pool,
|
BasePool,
|
||||||
ReplicatedPool,
|
ReplicatedPool,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
|
# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
|
||||||
# This should do a decent job of preventing people from passing in bad values.
|
# This should do a decent job of preventing people from passing in bad values.
|
||||||
# It will give a useful error message
|
# It will give a useful error message
|
||||||
|
|
||||||
POOL_KEYS = {
|
POOL_KEYS = {
|
||||||
# "Ceph Key Name": [Python type, [Valid Range]]
|
# "Ceph Key Name": [Python type, [Valid Range]]
|
||||||
"size": [int],
|
"size": [int],
|
||||||
@ -51,7 +67,7 @@ POOL_KEYS = {
|
|||||||
"write_fadvise_dontneed": [bool],
|
"write_fadvise_dontneed": [bool],
|
||||||
"noscrub": [bool],
|
"noscrub": [bool],
|
||||||
"nodeep-scrub": [bool],
|
"nodeep-scrub": [bool],
|
||||||
"hit_set_type": [six.string_types, ["bloom", "explicit_hash",
|
"hit_set_type": [str, ["bloom", "explicit_hash",
|
||||||
"explicit_object"]],
|
"explicit_object"]],
|
||||||
"hit_set_count": [int, [1, 1]],
|
"hit_set_count": [int, [1, 1]],
|
||||||
"hit_set_period": [int],
|
"hit_set_period": [int],
|
||||||
@ -64,6 +80,11 @@ POOL_KEYS = {
|
|||||||
"cache_min_flush_age": [int],
|
"cache_min_flush_age": [int],
|
||||||
"cache_min_evict_age": [int],
|
"cache_min_evict_age": [int],
|
||||||
"fast_read": [bool],
|
"fast_read": [bool],
|
||||||
|
"allow_ec_overwrites": [bool],
|
||||||
|
"compression_mode": [str, ["none", "passive", "aggressive", "force"]],
|
||||||
|
"compression_algorithm": [str, ["lz4", "snappy", "zlib", "zstd"]],
|
||||||
|
"compression_required_ratio": [float, [0.0, 1.0]],
|
||||||
|
"crush_rule": [str],
|
||||||
}
|
}
|
||||||
|
|
||||||
CEPH_BUCKET_TYPES = [
|
CEPH_BUCKET_TYPES = [
|
||||||
@ -96,6 +117,9 @@ def process_requests(reqs):
|
|||||||
|
|
||||||
This is a versioned api. API version must be supplied by the client making
|
This is a versioned api. API version must be supplied by the client making
|
||||||
the request.
|
the request.
|
||||||
|
|
||||||
|
:param reqs: dict of request parameters.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
"""
|
"""
|
||||||
request_id = reqs.get('request-id')
|
request_id = reqs.get('request-id')
|
||||||
try:
|
try:
|
||||||
@ -115,7 +139,7 @@ def process_requests(reqs):
|
|||||||
log(msg, level=ERROR)
|
log(msg, level=ERROR)
|
||||||
return {'exit-code': 1, 'stderr': msg}
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
msg = ("Missing or invalid api version (%s)" % version)
|
msg = ("Missing or invalid api version ({})".format(version))
|
||||||
resp = {'exit-code': 1, 'stderr': msg}
|
resp = {'exit-code': 1, 'stderr': msg}
|
||||||
if request_id:
|
if request_id:
|
||||||
resp['request-id'] = request_id
|
resp['request-id'] = request_id
|
||||||
@ -124,200 +148,53 @@ def process_requests(reqs):
|
|||||||
|
|
||||||
|
|
||||||
def handle_create_erasure_profile(request, service):
|
def handle_create_erasure_profile(request, service):
|
||||||
# "local" | "shec" or it defaults to "jerasure"
|
"""Create an erasure profile.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
# "isa" | "lrc" | "shec" | "clay" or it defaults to "jerasure"
|
||||||
erasure_type = request.get('erasure-type')
|
erasure_type = request.get('erasure-type')
|
||||||
# "host" | "rack" or it defaults to "host" # Any valid Ceph bucket
|
# dependent on erasure coding type
|
||||||
|
erasure_technique = request.get('erasure-technique')
|
||||||
|
# "host" | "rack" | ...
|
||||||
failure_domain = request.get('failure-domain')
|
failure_domain = request.get('failure-domain')
|
||||||
name = request.get('name')
|
name = request.get('name')
|
||||||
k = request.get('k')
|
# Binary Distribution Matrix (BDM) parameters
|
||||||
m = request.get('m')
|
bdm_k = request.get('k')
|
||||||
l = request.get('l')
|
bdm_m = request.get('m')
|
||||||
|
# LRC parameters
|
||||||
|
bdm_l = request.get('l')
|
||||||
|
crush_locality = request.get('crush-locality')
|
||||||
|
# SHEC parameters
|
||||||
|
bdm_c = request.get('c')
|
||||||
|
# CLAY parameters
|
||||||
|
bdm_d = request.get('d')
|
||||||
|
scalar_mds = request.get('scalar-mds')
|
||||||
|
# Device Class
|
||||||
|
device_class = request.get('device-class')
|
||||||
|
|
||||||
if failure_domain not in CEPH_BUCKET_TYPES:
|
if failure_domain and failure_domain not in CEPH_BUCKET_TYPES:
|
||||||
msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES)
|
msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES)
|
||||||
log(msg, level=ERROR)
|
log(msg, level=ERROR)
|
||||||
return {'exit-code': 1, 'stderr': msg}
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
create_erasure_profile(service=service, erasure_plugin_name=erasure_type,
|
create_erasure_profile(service=service,
|
||||||
profile_name=name, failure_domain=failure_domain,
|
erasure_plugin_name=erasure_type,
|
||||||
data_chunks=k, coding_chunks=m, locality=l)
|
profile_name=name,
|
||||||
|
failure_domain=failure_domain,
|
||||||
|
data_chunks=bdm_k,
|
||||||
|
coding_chunks=bdm_m,
|
||||||
|
locality=bdm_l,
|
||||||
|
durability_estimator=bdm_d,
|
||||||
|
helper_chunks=bdm_c,
|
||||||
|
scalar_mds=scalar_mds,
|
||||||
|
crush_locality=crush_locality,
|
||||||
|
device_class=device_class,
|
||||||
|
erasure_plugin_technique=erasure_technique)
|
||||||
|
|
||||||
|
return {'exit-code': 0}
|
||||||
def handle_erasure_pool(request, service):
|
|
||||||
"""Create a new erasure coded pool.
|
|
||||||
|
|
||||||
:param request: dict of request operations and params.
|
|
||||||
:param service: The ceph client to run the command under.
|
|
||||||
:returns: dict. exit-code and reason if not 0.
|
|
||||||
"""
|
|
||||||
pool_name = request.get('name')
|
|
||||||
erasure_profile = request.get('erasure-profile')
|
|
||||||
max_bytes = request.get('max-bytes')
|
|
||||||
max_objects = request.get('max-objects')
|
|
||||||
weight = request.get('weight')
|
|
||||||
group_name = request.get('group')
|
|
||||||
|
|
||||||
if erasure_profile is None:
|
|
||||||
erasure_profile = "default-canonical"
|
|
||||||
|
|
||||||
app_name = request.get('app-name')
|
|
||||||
|
|
||||||
# Check for missing params
|
|
||||||
if pool_name is None:
|
|
||||||
msg = "Missing parameter. name is required for the pool"
|
|
||||||
log(msg, level=ERROR)
|
|
||||||
return {'exit-code': 1, 'stderr': msg}
|
|
||||||
|
|
||||||
if group_name:
|
|
||||||
group_namespace = request.get('group-namespace')
|
|
||||||
# Add the pool to the group named "group_name"
|
|
||||||
add_pool_to_group(pool=pool_name,
|
|
||||||
group=group_name,
|
|
||||||
namespace=group_namespace)
|
|
||||||
|
|
||||||
# TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
|
|
||||||
if not erasure_profile_exists(service=service, name=erasure_profile):
|
|
||||||
# TODO: Fail and tell them to create the profile or default
|
|
||||||
msg = ("erasure-profile {} does not exist. Please create it with: "
|
|
||||||
"create-erasure-profile".format(erasure_profile))
|
|
||||||
log(msg, level=ERROR)
|
|
||||||
return {'exit-code': 1, 'stderr': msg}
|
|
||||||
|
|
||||||
pool = ErasurePool(service=service, name=pool_name,
|
|
||||||
erasure_code_profile=erasure_profile,
|
|
||||||
percent_data=weight, app_name=app_name)
|
|
||||||
# Ok make the erasure pool
|
|
||||||
if not pool_exists(service=service, name=pool_name):
|
|
||||||
log("Creating pool '{}' (erasure_profile={})"
|
|
||||||
.format(pool.name, erasure_profile), level=INFO)
|
|
||||||
pool.create()
|
|
||||||
|
|
||||||
# Set a quota if requested
|
|
||||||
if max_bytes or max_objects:
|
|
||||||
set_pool_quota(service=service, pool_name=pool_name,
|
|
||||||
max_bytes=max_bytes, max_objects=max_objects)
|
|
||||||
|
|
||||||
|
|
||||||
def handle_replicated_pool(request, service):
|
|
||||||
"""Create a new replicated pool.
|
|
||||||
|
|
||||||
:param request: dict of request operations and params.
|
|
||||||
:param service: The ceph client to run the command under.
|
|
||||||
:returns: dict. exit-code and reason if not 0.
|
|
||||||
"""
|
|
||||||
pool_name = request.get('name')
|
|
||||||
replicas = request.get('replicas')
|
|
||||||
max_bytes = request.get('max-bytes')
|
|
||||||
max_objects = request.get('max-objects')
|
|
||||||
weight = request.get('weight')
|
|
||||||
group_name = request.get('group')
|
|
||||||
|
|
||||||
# Optional params
|
|
||||||
pg_num = request.get('pg_num')
|
|
||||||
if pg_num:
|
|
||||||
# Cap pg_num to max allowed just in case.
|
|
||||||
osds = get_osds(service)
|
|
||||||
if osds:
|
|
||||||
pg_num = min(pg_num, (len(osds) * 100 // replicas))
|
|
||||||
|
|
||||||
app_name = request.get('app-name')
|
|
||||||
# Check for missing params
|
|
||||||
if pool_name is None or replicas is None:
|
|
||||||
msg = "Missing parameter. name and replicas are required"
|
|
||||||
log(msg, level=ERROR)
|
|
||||||
return {'exit-code': 1, 'stderr': msg}
|
|
||||||
|
|
||||||
if group_name:
|
|
||||||
group_namespace = request.get('group-namespace')
|
|
||||||
# Add the pool to the group named "group_name"
|
|
||||||
add_pool_to_group(pool=pool_name,
|
|
||||||
group=group_name,
|
|
||||||
namespace=group_namespace)
|
|
||||||
|
|
||||||
kwargs = {}
|
|
||||||
if pg_num:
|
|
||||||
kwargs['pg_num'] = pg_num
|
|
||||||
if weight:
|
|
||||||
kwargs['percent_data'] = weight
|
|
||||||
if replicas:
|
|
||||||
kwargs['replicas'] = replicas
|
|
||||||
if app_name:
|
|
||||||
kwargs['app_name'] = app_name
|
|
||||||
|
|
||||||
pool = ReplicatedPool(service=service,
|
|
||||||
name=pool_name, **kwargs)
|
|
||||||
if not pool_exists(service=service, name=pool_name):
|
|
||||||
log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
|
|
||||||
level=INFO)
|
|
||||||
pool.create()
|
|
||||||
else:
|
|
||||||
log("Pool '{}' already exists - skipping create".format(pool.name),
|
|
||||||
level=DEBUG)
|
|
||||||
|
|
||||||
# Set a quota if requested
|
|
||||||
if max_bytes or max_objects:
|
|
||||||
set_pool_quota(service=service, pool_name=pool_name,
|
|
||||||
max_bytes=max_bytes, max_objects=max_objects)
|
|
||||||
|
|
||||||
|
|
||||||
def handle_create_cache_tier(request, service):
|
|
||||||
# mode = "writeback" | "readonly"
|
|
||||||
storage_pool = request.get('cold-pool')
|
|
||||||
cache_pool = request.get('hot-pool')
|
|
||||||
cache_mode = request.get('mode')
|
|
||||||
|
|
||||||
if cache_mode is None:
|
|
||||||
cache_mode = "writeback"
|
|
||||||
|
|
||||||
# cache and storage pool must exist first
|
|
||||||
if not pool_exists(service=service, name=storage_pool) or not pool_exists(
|
|
||||||
service=service, name=cache_pool):
|
|
||||||
msg = ("cold-pool: {} and hot-pool: {} must exist. Please create "
|
|
||||||
"them first".format(storage_pool, cache_pool))
|
|
||||||
log(msg, level=ERROR)
|
|
||||||
return {'exit-code': 1, 'stderr': msg}
|
|
||||||
|
|
||||||
p = Pool(service=service, name=storage_pool)
|
|
||||||
p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
|
|
||||||
|
|
||||||
|
|
||||||
def handle_remove_cache_tier(request, service):
|
|
||||||
storage_pool = request.get('cold-pool')
|
|
||||||
cache_pool = request.get('hot-pool')
|
|
||||||
# cache and storage pool must exist first
|
|
||||||
if not pool_exists(service=service, name=storage_pool) or not pool_exists(
|
|
||||||
service=service, name=cache_pool):
|
|
||||||
msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not "
|
|
||||||
"deleting cache tier".format(storage_pool, cache_pool))
|
|
||||||
log(msg, level=ERROR)
|
|
||||||
return {'exit-code': 1, 'stderr': msg}
|
|
||||||
|
|
||||||
pool = Pool(name=storage_pool, service=service)
|
|
||||||
pool.remove_cache_tier(cache_pool=cache_pool)
|
|
||||||
|
|
||||||
|
|
||||||
def handle_set_pool_value(request, service):
|
|
||||||
# Set arbitrary pool values
|
|
||||||
params = {'pool': request.get('name'),
|
|
||||||
'key': request.get('key'),
|
|
||||||
'value': request.get('value')}
|
|
||||||
if params['key'] not in POOL_KEYS:
|
|
||||||
msg = "Invalid key '%s'" % params['key']
|
|
||||||
log(msg, level=ERROR)
|
|
||||||
return {'exit-code': 1, 'stderr': msg}
|
|
||||||
|
|
||||||
# Get the validation method
|
|
||||||
validator_params = POOL_KEYS[params['key']]
|
|
||||||
if len(validator_params) is 1:
|
|
||||||
# Validate that what the user passed is actually legal per Ceph's rules
|
|
||||||
validator(params['value'], validator_params[0])
|
|
||||||
else:
|
|
||||||
# Validate that what the user passed is actually legal per Ceph's rules
|
|
||||||
validator(params['value'], validator_params[0], validator_params[1])
|
|
||||||
|
|
||||||
# Set the value
|
|
||||||
pool_set(service=service, pool_name=params['pool'], key=params['key'],
|
|
||||||
value=params['value'])
|
|
||||||
|
|
||||||
|
|
||||||
def handle_add_permissions_to_key(request, service):
|
def handle_add_permissions_to_key(request, service):
|
||||||
@ -358,6 +235,30 @@ def handle_add_permissions_to_key(request, service):
|
|||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
|
||||||
|
def handle_set_key_permissions(request, service):
|
||||||
|
"""Ensure the key has the requested permissions."""
|
||||||
|
permissions = request.get('permissions')
|
||||||
|
client = request.get('client')
|
||||||
|
call = ['ceph', '--id', service, 'auth', 'caps',
|
||||||
|
'client.{}'.format(client)] + permissions
|
||||||
|
try:
|
||||||
|
check_call(call)
|
||||||
|
except CalledProcessError as e:
|
||||||
|
log("Error updating key capabilities: {}".format(e), level=ERROR)
|
||||||
|
|
||||||
|
|
||||||
|
def update_service_permissions(service, service_obj=None, namespace=None):
|
||||||
|
"""Update the key permissions for the named client in Ceph"""
|
||||||
|
if not service_obj:
|
||||||
|
service_obj = get_service_groups(service=service, namespace=namespace)
|
||||||
|
permissions = pool_permission_list_for_service(service_obj)
|
||||||
|
call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions
|
||||||
|
try:
|
||||||
|
check_call(call)
|
||||||
|
except CalledProcessError as e:
|
||||||
|
log("Error updating key capabilities: {}".format(e))
|
||||||
|
|
||||||
|
|
||||||
def add_pool_to_group(pool, group, namespace=None):
|
def add_pool_to_group(pool, group, namespace=None):
|
||||||
"""Add a named pool to a named group"""
|
"""Add a named pool to a named group"""
|
||||||
group_name = group
|
group_name = group
|
||||||
@ -394,60 +295,6 @@ def pool_permission_list_for_service(service):
|
|||||||
'osd', ', '.join(permissions)]
|
'osd', ', '.join(permissions)]
|
||||||
|
|
||||||
|
|
||||||
def update_service_permissions(service, service_obj=None, namespace=None):
|
|
||||||
"""Update the key permissions for the named client in Ceph"""
|
|
||||||
if not service_obj:
|
|
||||||
service_obj = get_service_groups(service=service, namespace=namespace)
|
|
||||||
permissions = pool_permission_list_for_service(service_obj)
|
|
||||||
call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions
|
|
||||||
try:
|
|
||||||
check_call(call)
|
|
||||||
except CalledProcessError as e:
|
|
||||||
log("Error updating key capabilities: {}".format(e))
|
|
||||||
|
|
||||||
|
|
||||||
def save_service(service_name, service):
|
|
||||||
"""Persist a service in the monitor cluster"""
|
|
||||||
service['groups'] = {}
|
|
||||||
return monitor_key_set(service='admin',
|
|
||||||
key="cephx.services.{}".format(service_name),
|
|
||||||
value=json.dumps(service, sort_keys=True))
|
|
||||||
|
|
||||||
|
|
||||||
def save_group(group, group_name):
|
|
||||||
"""Persist a group in the monitor cluster"""
|
|
||||||
group_key = get_group_key(group_name=group_name)
|
|
||||||
return monitor_key_set(service='admin',
|
|
||||||
key=group_key,
|
|
||||||
value=json.dumps(group, sort_keys=True))
|
|
||||||
|
|
||||||
|
|
||||||
def get_group(group_name):
|
|
||||||
"""A group is a structure to hold data about a named group, structured as:
|
|
||||||
{
|
|
||||||
pools: ['glance'],
|
|
||||||
services: ['nova']
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
group_key = get_group_key(group_name=group_name)
|
|
||||||
group_json = monitor_key_get(service='admin', key=group_key)
|
|
||||||
try:
|
|
||||||
group = json.loads(group_json)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
group = None
|
|
||||||
if not group:
|
|
||||||
group = {
|
|
||||||
'pools': [],
|
|
||||||
'services': []
|
|
||||||
}
|
|
||||||
return group
|
|
||||||
|
|
||||||
|
|
||||||
def get_group_key(group_name):
|
|
||||||
"""Build group key"""
|
|
||||||
return 'cephx.groups.{}'.format(group_name)
|
|
||||||
|
|
||||||
|
|
||||||
def get_service_groups(service, namespace=None):
|
def get_service_groups(service, namespace=None):
|
||||||
"""Services are objects stored with some metadata, they look like (for a
|
"""Services are objects stored with some metadata, they look like (for a
|
||||||
service named "nova"):
|
service named "nova"):
|
||||||
@ -506,6 +353,478 @@ def _build_service_groups(service, namespace=None):
|
|||||||
return all_groups
|
return all_groups
|
||||||
|
|
||||||
|
|
||||||
|
def get_group(group_name):
|
||||||
|
"""A group is a structure to hold data about a named group, structured as:
|
||||||
|
{
|
||||||
|
pools: ['glance'],
|
||||||
|
services: ['nova']
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
group_key = get_group_key(group_name=group_name)
|
||||||
|
group_json = monitor_key_get(service='admin', key=group_key)
|
||||||
|
try:
|
||||||
|
group = json.loads(group_json)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
group = None
|
||||||
|
if not group:
|
||||||
|
group = {
|
||||||
|
'pools': [],
|
||||||
|
'services': []
|
||||||
|
}
|
||||||
|
return group
|
||||||
|
|
||||||
|
|
||||||
|
def save_service(service_name, service):
|
||||||
|
"""Persist a service in the monitor cluster"""
|
||||||
|
service['groups'] = {}
|
||||||
|
return monitor_key_set(service='admin',
|
||||||
|
key="cephx.services.{}".format(service_name),
|
||||||
|
value=json.dumps(service, sort_keys=True))
|
||||||
|
|
||||||
|
|
||||||
|
def save_group(group, group_name):
|
||||||
|
"""Persist a group in the monitor cluster"""
|
||||||
|
group_key = get_group_key(group_name=group_name)
|
||||||
|
return monitor_key_set(service='admin',
|
||||||
|
key=group_key,
|
||||||
|
value=json.dumps(group, sort_keys=True))
|
||||||
|
|
||||||
|
|
||||||
|
def get_group_key(group_name):
|
||||||
|
"""Build group key"""
|
||||||
|
return 'cephx.groups.{}'.format(group_name)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_erasure_pool(request, service):
|
||||||
|
"""Create a new erasure coded pool.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params.
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0.
|
||||||
|
"""
|
||||||
|
pool_name = request.get('name')
|
||||||
|
erasure_profile = request.get('erasure-profile')
|
||||||
|
group_name = request.get('group')
|
||||||
|
|
||||||
|
if erasure_profile is None:
|
||||||
|
erasure_profile = "default-canonical"
|
||||||
|
|
||||||
|
if group_name:
|
||||||
|
group_namespace = request.get('group-namespace')
|
||||||
|
# Add the pool to the group named "group_name"
|
||||||
|
add_pool_to_group(pool=pool_name,
|
||||||
|
group=group_name,
|
||||||
|
namespace=group_namespace)
|
||||||
|
|
||||||
|
# TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
|
||||||
|
if not erasure_profile_exists(service=service, name=erasure_profile):
|
||||||
|
# TODO: Fail and tell them to create the profile or default
|
||||||
|
msg = ("erasure-profile {} does not exist. Please create it with: "
|
||||||
|
"create-erasure-profile".format(erasure_profile))
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
|
try:
|
||||||
|
pool = ErasurePool(service=service,
|
||||||
|
op=request)
|
||||||
|
except KeyError:
|
||||||
|
msg = "Missing parameter."
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
|
# Ok make the erasure pool
|
||||||
|
if not pool_exists(service=service, name=pool_name):
|
||||||
|
log("Creating pool '{}' (erasure_profile={})"
|
||||||
|
.format(pool.name, erasure_profile), level=INFO)
|
||||||
|
pool.create()
|
||||||
|
|
||||||
|
# Set/update properties that are allowed to change after pool creation.
|
||||||
|
pool.update()
|
||||||
|
|
||||||
|
|
||||||
|
def handle_replicated_pool(request, service):
|
||||||
|
"""Create a new replicated pool.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params.
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0.
|
||||||
|
"""
|
||||||
|
pool_name = request.get('name')
|
||||||
|
group_name = request.get('group')
|
||||||
|
|
||||||
|
# Optional params
|
||||||
|
# NOTE: Check this against the handling in the Pool classes, reconcile and
|
||||||
|
# remove.
|
||||||
|
pg_num = request.get('pg_num')
|
||||||
|
replicas = request.get('replicas')
|
||||||
|
if pg_num:
|
||||||
|
# Cap pg_num to max allowed just in case.
|
||||||
|
osds = get_osds(service)
|
||||||
|
if osds:
|
||||||
|
pg_num = min(pg_num, (len(osds) * 100 // replicas))
|
||||||
|
request.update({'pg_num': pg_num})
|
||||||
|
|
||||||
|
if group_name:
|
||||||
|
group_namespace = request.get('group-namespace')
|
||||||
|
# Add the pool to the group named "group_name"
|
||||||
|
add_pool_to_group(pool=pool_name,
|
||||||
|
group=group_name,
|
||||||
|
namespace=group_namespace)
|
||||||
|
|
||||||
|
try:
|
||||||
|
pool = ReplicatedPool(service=service,
|
||||||
|
op=request)
|
||||||
|
except KeyError:
|
||||||
|
msg = "Missing parameter."
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
|
if not pool_exists(service=service, name=pool_name):
|
||||||
|
log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
|
||||||
|
level=INFO)
|
||||||
|
pool.create()
|
||||||
|
else:
|
||||||
|
log("Pool '{}' already exists - skipping create".format(pool.name),
|
||||||
|
level=DEBUG)
|
||||||
|
|
||||||
|
# Set/update properties that are allowed to change after pool creation.
|
||||||
|
pool.update()
|
||||||
|
|
||||||
|
|
||||||
|
def handle_create_cache_tier(request, service):
|
||||||
|
"""Create a cache tier on a cold pool. Modes supported are
|
||||||
|
"writeback" and "readonly".
|
||||||
|
|
||||||
|
:param request: dict of request operations and params
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
# mode = "writeback" | "readonly"
|
||||||
|
storage_pool = request.get('cold-pool')
|
||||||
|
cache_pool = request.get('hot-pool')
|
||||||
|
cache_mode = request.get('mode')
|
||||||
|
|
||||||
|
if cache_mode is None:
|
||||||
|
cache_mode = "writeback"
|
||||||
|
|
||||||
|
# cache and storage pool must exist first
|
||||||
|
if not pool_exists(service=service, name=storage_pool) or not pool_exists(
|
||||||
|
service=service, name=cache_pool):
|
||||||
|
msg = ("cold-pool: {} and hot-pool: {} must exist. Please create "
|
||||||
|
"them first".format(storage_pool, cache_pool))
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
|
p = BasePool(service=service, name=storage_pool)
|
||||||
|
p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_remove_cache_tier(request, service):
|
||||||
|
"""Remove a cache tier from the cold pool.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
storage_pool = request.get('cold-pool')
|
||||||
|
cache_pool = request.get('hot-pool')
|
||||||
|
# cache and storage pool must exist first
|
||||||
|
if not pool_exists(service=service, name=storage_pool) or not pool_exists(
|
||||||
|
service=service, name=cache_pool):
|
||||||
|
msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not "
|
||||||
|
"deleting cache tier".format(storage_pool, cache_pool))
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
|
pool = BasePool(name=storage_pool, service=service)
|
||||||
|
pool.remove_cache_tier(cache_pool=cache_pool)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_set_pool_value(request, service, coerce=False):
|
||||||
|
"""Sets an arbitrary pool value.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:param coerce: Try to parse/coerce the value into the correct type.
|
||||||
|
Used by the action code that only gets Str from Juju
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
# Set arbitrary pool values
|
||||||
|
params = {'pool': request.get('name'),
|
||||||
|
'key': request.get('key'),
|
||||||
|
'value': request.get('value')}
|
||||||
|
if params['key'] not in POOL_KEYS:
|
||||||
|
msg = "Invalid key '{}'".format(params['key'])
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
|
# Get the validation method
|
||||||
|
validator_params = POOL_KEYS[params['key']]
|
||||||
|
# BUG: #1838650 - the function needs to try to coerce the value param to
|
||||||
|
# the type required for the validator to pass. Note, if this blows, then
|
||||||
|
# the param isn't parsable to the correct type.
|
||||||
|
if coerce:
|
||||||
|
try:
|
||||||
|
params['value'] = validator_params[0](params['value'])
|
||||||
|
except ValueError:
|
||||||
|
raise RuntimeError("Value {} isn't of type {}"
|
||||||
|
.format(params['value'], validator_params[0]))
|
||||||
|
# end of BUG: #1838650
|
||||||
|
if len(validator_params) == 1:
|
||||||
|
# Validate that what the user passed is actually legal per Ceph's rules
|
||||||
|
validator(params['value'], validator_params[0])
|
||||||
|
else:
|
||||||
|
# Validate that what the user passed is actually legal per Ceph's rules
|
||||||
|
validator(params['value'], validator_params[0], validator_params[1])
|
||||||
|
|
||||||
|
# Set the value
|
||||||
|
pool_set(service=service, pool_name=params['pool'], key=params['key'],
|
||||||
|
value=params['value'])
|
||||||
|
|
||||||
|
|
||||||
|
def handle_rgw_regionmap_update(request, service):
|
||||||
|
"""Change the radosgw region map.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
name = request.get('client-name')
|
||||||
|
if not name:
|
||||||
|
msg = "Missing rgw-region or client-name params"
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
try:
|
||||||
|
check_output(['radosgw-admin',
|
||||||
|
'--id', service,
|
||||||
|
'regionmap', 'update', '--name', name])
|
||||||
|
except CalledProcessError as err:
|
||||||
|
log(err.output, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': err.output}
|
||||||
|
|
||||||
|
|
||||||
|
def handle_rgw_regionmap_default(request, service):
|
||||||
|
"""Create a radosgw region map.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
region = request.get('rgw-region')
|
||||||
|
name = request.get('client-name')
|
||||||
|
if not region or not name:
|
||||||
|
msg = "Missing rgw-region or client-name params"
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
try:
|
||||||
|
check_output(
|
||||||
|
[
|
||||||
|
'radosgw-admin',
|
||||||
|
'--id', service,
|
||||||
|
'regionmap',
|
||||||
|
'default',
|
||||||
|
'--rgw-region', region,
|
||||||
|
'--name', name])
|
||||||
|
except CalledProcessError as err:
|
||||||
|
log(err.output, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': err.output}
|
||||||
|
|
||||||
|
|
||||||
|
def handle_rgw_zone_set(request, service):
|
||||||
|
"""Create a radosgw zone.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
json_file = request.get('zone-json')
|
||||||
|
name = request.get('client-name')
|
||||||
|
region_name = request.get('region-name')
|
||||||
|
zone_name = request.get('zone-name')
|
||||||
|
if not json_file or not name or not region_name or not zone_name:
|
||||||
|
msg = "Missing json-file or client-name params"
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
infile = NamedTemporaryFile(delete=False)
|
||||||
|
with open(infile.name, 'w') as infile_handle:
|
||||||
|
infile_handle.write(json_file)
|
||||||
|
try:
|
||||||
|
check_output(
|
||||||
|
[
|
||||||
|
'radosgw-admin',
|
||||||
|
'--id', service,
|
||||||
|
'zone',
|
||||||
|
'set',
|
||||||
|
'--rgw-zone', zone_name,
|
||||||
|
'--infile', infile.name,
|
||||||
|
'--name', name,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
except CalledProcessError as err:
|
||||||
|
log(err.output, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': err.output}
|
||||||
|
os.unlink(infile.name)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_put_osd_in_bucket(request, service):
|
||||||
|
"""Move an osd into a specified crush bucket.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
osd_id = request.get('osd')
|
||||||
|
target_bucket = request.get('bucket')
|
||||||
|
if not osd_id or not target_bucket:
|
||||||
|
msg = "Missing OSD ID or Bucket"
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
crushmap = Crushmap()
|
||||||
|
try:
|
||||||
|
crushmap.ensure_bucket_is_present(target_bucket)
|
||||||
|
check_output(
|
||||||
|
[
|
||||||
|
'ceph',
|
||||||
|
'--id', service,
|
||||||
|
'osd',
|
||||||
|
'crush',
|
||||||
|
'set',
|
||||||
|
str(osd_id),
|
||||||
|
str(get_osd_weight(osd_id)),
|
||||||
|
"root={}".format(target_bucket)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
msg = "Failed to move OSD " \
|
||||||
|
"{} into Bucket {} :: {}".format(osd_id, target_bucket, exc)
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
|
|
||||||
|
def handle_rgw_create_user(request, service):
|
||||||
|
"""Create a new rados gateway user.
|
||||||
|
|
||||||
|
:param request: dict of request operations and params
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
user_id = request.get('rgw-uid')
|
||||||
|
display_name = request.get('display-name')
|
||||||
|
name = request.get('client-name')
|
||||||
|
if not name or not display_name or not user_id:
|
||||||
|
msg = "Missing client-name, display-name or rgw-uid"
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
try:
|
||||||
|
create_output = check_output(
|
||||||
|
[
|
||||||
|
'radosgw-admin',
|
||||||
|
'--id', service,
|
||||||
|
'user',
|
||||||
|
'create',
|
||||||
|
'--uid', user_id,
|
||||||
|
'--display-name', display_name,
|
||||||
|
'--name', name,
|
||||||
|
'--system'
|
||||||
|
]
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
user_json = json.loads(str(create_output.decode('UTF-8')))
|
||||||
|
return {'exit-code': 0, 'user': user_json}
|
||||||
|
except ValueError as err:
|
||||||
|
log(err, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': err}
|
||||||
|
|
||||||
|
except CalledProcessError as err:
|
||||||
|
log(err.output, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': err.output}
|
||||||
|
|
||||||
|
|
||||||
|
def handle_create_cephfs(request, service):
|
||||||
|
"""Create a new cephfs.
|
||||||
|
|
||||||
|
:param request: The broker request
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
cephfs_name = request.get('mds_name')
|
||||||
|
data_pool = request.get('data_pool')
|
||||||
|
metadata_pool = request.get('metadata_pool')
|
||||||
|
# Check if the user params were provided
|
||||||
|
if not cephfs_name or not data_pool or not metadata_pool:
|
||||||
|
msg = "Missing mds_name, data_pool or metadata_pool params"
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
|
# Sanity check that the required pools exist
|
||||||
|
if not pool_exists(service=service, name=data_pool):
|
||||||
|
msg = "CephFS data pool does not exist. Cannot create CephFS"
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
if not pool_exists(service=service, name=metadata_pool):
|
||||||
|
msg = "CephFS metadata pool does not exist. Cannot create CephFS"
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
||||||
|
if get_cephfs(service=service):
|
||||||
|
# CephFS new has already been called
|
||||||
|
log("CephFS already created")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Finally create CephFS
|
||||||
|
try:
|
||||||
|
check_output(["ceph",
|
||||||
|
'--id', service,
|
||||||
|
"fs", "new", cephfs_name,
|
||||||
|
metadata_pool,
|
||||||
|
data_pool])
|
||||||
|
except CalledProcessError as err:
|
||||||
|
if err.returncode == 22:
|
||||||
|
log("CephFS already created")
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
log(err.output, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': err.output}
|
||||||
|
|
||||||
|
|
||||||
|
def handle_rgw_region_set(request, service):
|
||||||
|
# radosgw-admin region set --infile us.json --name client.radosgw.us-east-1
|
||||||
|
"""Set the rados gateway region.
|
||||||
|
|
||||||
|
:param request: dict. The broker request.
|
||||||
|
:param service: The ceph client to run the command under.
|
||||||
|
:returns: dict. exit-code and reason if not 0
|
||||||
|
"""
|
||||||
|
json_file = request.get('region-json')
|
||||||
|
name = request.get('client-name')
|
||||||
|
region_name = request.get('region-name')
|
||||||
|
zone_name = request.get('zone-name')
|
||||||
|
if not json_file or not name or not region_name or not zone_name:
|
||||||
|
msg = "Missing json-file or client-name params"
|
||||||
|
log(msg, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
infile = NamedTemporaryFile(delete=False)
|
||||||
|
with open(infile.name, 'w') as infile_handle:
|
||||||
|
infile_handle.write(json_file)
|
||||||
|
try:
|
||||||
|
check_output(
|
||||||
|
[
|
||||||
|
'radosgw-admin',
|
||||||
|
'--id', service,
|
||||||
|
'region',
|
||||||
|
'set',
|
||||||
|
'--rgw-zone', zone_name,
|
||||||
|
'--infile', infile.name,
|
||||||
|
'--name', name,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
except CalledProcessError as err:
|
||||||
|
log(err.output, level=ERROR)
|
||||||
|
return {'exit-code': 1, 'stderr': err.output}
|
||||||
|
os.unlink(infile.name)
|
||||||
|
|
||||||
|
|
||||||
def process_requests_v1(reqs):
|
def process_requests_v1(reqs):
|
||||||
"""Process v1 requests.
|
"""Process v1 requests.
|
||||||
|
|
||||||
@ -516,10 +835,10 @@ def process_requests_v1(reqs):
|
|||||||
operation failed along with an explanation).
|
operation failed along with an explanation).
|
||||||
"""
|
"""
|
||||||
ret = None
|
ret = None
|
||||||
log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
|
log("Processing {} ceph broker requests".format(len(reqs)), level=INFO)
|
||||||
for req in reqs:
|
for req in reqs:
|
||||||
op = req.get('op')
|
op = req.get('op')
|
||||||
log("Processing op='%s'" % op, level=DEBUG)
|
log("Processing op='{}'".format(op), level=DEBUG)
|
||||||
# Use admin client since we do not have other client key locations
|
# Use admin client since we do not have other client key locations
|
||||||
# setup to use them for these operations.
|
# setup to use them for these operations.
|
||||||
svc = 'admin'
|
svc = 'admin'
|
||||||
@ -531,7 +850,8 @@ def process_requests_v1(reqs):
|
|||||||
ret = handle_erasure_pool(request=req, service=svc)
|
ret = handle_erasure_pool(request=req, service=svc)
|
||||||
else:
|
else:
|
||||||
ret = handle_replicated_pool(request=req, service=svc)
|
ret = handle_replicated_pool(request=req, service=svc)
|
||||||
|
elif op == "create-cephfs":
|
||||||
|
ret = handle_create_cephfs(request=req, service=svc)
|
||||||
elif op == "create-cache-tier":
|
elif op == "create-cache-tier":
|
||||||
ret = handle_create_cache_tier(request=req, service=svc)
|
ret = handle_create_cache_tier(request=req, service=svc)
|
||||||
elif op == "remove-cache-tier":
|
elif op == "remove-cache-tier":
|
||||||
@ -558,10 +878,24 @@ def process_requests_v1(reqs):
|
|||||||
snapshot_name=snapshot_name)
|
snapshot_name=snapshot_name)
|
||||||
elif op == "set-pool-value":
|
elif op == "set-pool-value":
|
||||||
ret = handle_set_pool_value(request=req, service=svc)
|
ret = handle_set_pool_value(request=req, service=svc)
|
||||||
|
elif op == "rgw-region-set":
|
||||||
|
ret = handle_rgw_region_set(request=req, service=svc)
|
||||||
|
elif op == "rgw-zone-set":
|
||||||
|
ret = handle_rgw_zone_set(request=req, service=svc)
|
||||||
|
elif op == "rgw-regionmap-update":
|
||||||
|
ret = handle_rgw_regionmap_update(request=req, service=svc)
|
||||||
|
elif op == "rgw-regionmap-default":
|
||||||
|
ret = handle_rgw_regionmap_default(request=req, service=svc)
|
||||||
|
elif op == "rgw-create-user":
|
||||||
|
ret = handle_rgw_create_user(request=req, service=svc)
|
||||||
|
elif op == "move-osd-to-bucket":
|
||||||
|
ret = handle_put_osd_in_bucket(request=req, service=svc)
|
||||||
elif op == "add-permissions-to-key":
|
elif op == "add-permissions-to-key":
|
||||||
ret = handle_add_permissions_to_key(request=req, service=svc)
|
ret = handle_add_permissions_to_key(request=req, service=svc)
|
||||||
|
elif op == 'set-key-permissions':
|
||||||
|
ret = handle_set_key_permissions(request=req, service=svc)
|
||||||
else:
|
else:
|
||||||
msg = "Unknown operation '%s'" % op
|
msg = "Unknown operation '{}'".format(op)
|
||||||
log(msg, level=ERROR)
|
log(msg, level=ERROR)
|
||||||
return {'exit-code': 1, 'stderr': msg}
|
return {'exit-code': 1, 'stderr': msg}
|
||||||
|
|
154
lib/charms_ceph/crush_utils.py
Normal file
154
lib/charms_ceph/crush_utils.py
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
# Copyright 2014 Canonical Limited.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
from subprocess import check_output, CalledProcessError
|
||||||
|
|
||||||
|
from charmhelpers.core.hookenv import (
|
||||||
|
log,
|
||||||
|
ERROR,
|
||||||
|
)
|
||||||
|
|
||||||
|
CRUSH_BUCKET = """root {name} {{
|
||||||
|
id {id} # do not change unnecessarily
|
||||||
|
# weight 0.000
|
||||||
|
alg straw2
|
||||||
|
hash 0 # rjenkins1
|
||||||
|
}}
|
||||||
|
|
||||||
|
rule {name} {{
|
||||||
|
ruleset 0
|
||||||
|
type replicated
|
||||||
|
min_size 1
|
||||||
|
max_size 10
|
||||||
|
step take {name}
|
||||||
|
step chooseleaf firstn 0 type host
|
||||||
|
step emit
|
||||||
|
}}"""
|
||||||
|
|
||||||
|
# This regular expression looks for a string like:
|
||||||
|
# root NAME {
|
||||||
|
# id NUMBER
|
||||||
|
# so that we can extract NAME and ID from the crushmap
|
||||||
|
CRUSHMAP_BUCKETS_RE = re.compile(r"root\s+(.+)\s+\{\s*id\s+(-?\d+)")
|
||||||
|
|
||||||
|
# This regular expression looks for ID strings in the crushmap like:
|
||||||
|
# id NUMBER
|
||||||
|
# so that we can extract the IDs from a crushmap
|
||||||
|
CRUSHMAP_ID_RE = re.compile(r"id\s+(-?\d+)")
|
||||||
|
|
||||||
|
|
||||||
|
class Crushmap(object):
|
||||||
|
"""An object oriented approach to Ceph crushmap management."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._crushmap = self.load_crushmap()
|
||||||
|
roots = re.findall(CRUSHMAP_BUCKETS_RE, self._crushmap)
|
||||||
|
buckets = []
|
||||||
|
ids = list(map(
|
||||||
|
lambda x: int(x),
|
||||||
|
re.findall(CRUSHMAP_ID_RE, self._crushmap)))
|
||||||
|
ids = sorted(ids)
|
||||||
|
if roots != []:
|
||||||
|
for root in roots:
|
||||||
|
buckets.append(CRUSHBucket(root[0], root[1], True))
|
||||||
|
|
||||||
|
self._buckets = buckets
|
||||||
|
if ids != []:
|
||||||
|
self._ids = ids
|
||||||
|
else:
|
||||||
|
self._ids = [0]
|
||||||
|
|
||||||
|
def load_crushmap(self):
|
||||||
|
try:
|
||||||
|
crush = str(check_output(['ceph', 'osd', 'getcrushmap'])
|
||||||
|
.decode('UTF-8'))
|
||||||
|
return str(check_output(['crushtool', '-d', '-'],
|
||||||
|
stdin=crush.stdout)
|
||||||
|
.decode('UTF-8'))
|
||||||
|
except CalledProcessError as e:
|
||||||
|
log("Error occured while loading and decompiling CRUSH map:"
|
||||||
|
"{}".format(e), ERROR)
|
||||||
|
raise "Failed to read CRUSH map"
|
||||||
|
|
||||||
|
def ensure_bucket_is_present(self, bucket_name):
|
||||||
|
if bucket_name not in [bucket.name for bucket in self.buckets()]:
|
||||||
|
self.add_bucket(bucket_name)
|
||||||
|
self.save()
|
||||||
|
|
||||||
|
def buckets(self):
|
||||||
|
"""Return a list of buckets that are in the Crushmap."""
|
||||||
|
return self._buckets
|
||||||
|
|
||||||
|
def add_bucket(self, bucket_name):
|
||||||
|
"""Add a named bucket to Ceph"""
|
||||||
|
new_id = min(self._ids) - 1
|
||||||
|
self._ids.append(new_id)
|
||||||
|
self._buckets.append(CRUSHBucket(bucket_name, new_id))
|
||||||
|
|
||||||
|
def save(self):
|
||||||
|
"""Persist Crushmap to Ceph"""
|
||||||
|
try:
|
||||||
|
crushmap = self.build_crushmap()
|
||||||
|
compiled = str(check_output(['crushtool', '-c', '/dev/stdin', '-o',
|
||||||
|
'/dev/stdout'], stdin=crushmap)
|
||||||
|
.decode('UTF-8'))
|
||||||
|
ceph_output = str(check_output(['ceph', 'osd', 'setcrushmap', '-i',
|
||||||
|
'/dev/stdin'], stdin=compiled)
|
||||||
|
.decode('UTF-8'))
|
||||||
|
return ceph_output
|
||||||
|
except CalledProcessError as e:
|
||||||
|
log("save error: {}".format(e))
|
||||||
|
raise "Failed to save CRUSH map."
|
||||||
|
|
||||||
|
def build_crushmap(self):
|
||||||
|
"""Modifies the current CRUSH map to include the new buckets"""
|
||||||
|
tmp_crushmap = self._crushmap
|
||||||
|
for bucket in self._buckets:
|
||||||
|
if not bucket.default:
|
||||||
|
tmp_crushmap = "{}\n\n{}".format(
|
||||||
|
tmp_crushmap,
|
||||||
|
Crushmap.bucket_string(bucket.name, bucket.id))
|
||||||
|
|
||||||
|
return tmp_crushmap
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def bucket_string(name, id):
|
||||||
|
return CRUSH_BUCKET.format(name=name, id=id)
|
||||||
|
|
||||||
|
|
||||||
|
class CRUSHBucket(object):
|
||||||
|
"""CRUSH bucket description object."""
|
||||||
|
|
||||||
|
def __init__(self, name, id, default=False):
|
||||||
|
self.name = name
|
||||||
|
self.id = int(id)
|
||||||
|
self.default = default
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "Bucket {{Name: {name}, ID: {id}}}".format(
|
||||||
|
name=self.name, id=self.id)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
"""Override the default Equals behavior"""
|
||||||
|
if isinstance(other, self.__class__):
|
||||||
|
return self.__dict__ == other.__dict__
|
||||||
|
return NotImplemented
|
||||||
|
|
||||||
|
def __ne__(self, other):
|
||||||
|
"""Define a non-equality test"""
|
||||||
|
if isinstance(other, self.__class__):
|
||||||
|
return not self.__eq__(other)
|
||||||
|
return NotImplemented
|
3349
lib/charms_ceph/utils.py
Normal file
3349
lib/charms_ceph/utils.py
Normal file
File diff suppressed because it is too large
Load Diff
@ -1,136 +0,0 @@
|
|||||||
import json
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
import mock
|
|
||||||
|
|
||||||
import ceph_broker
|
|
||||||
|
|
||||||
|
|
||||||
class CephBrokerTestCase(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
super(CephBrokerTestCase, self).setUp()
|
|
||||||
|
|
||||||
@mock.patch('ceph_broker.log')
|
|
||||||
def test_process_requests_noop(self, mock_log):
|
|
||||||
req = json.dumps({'api-version': 1, 'ops': []})
|
|
||||||
rc = ceph_broker.process_requests(req)
|
|
||||||
self.assertEqual(json.loads(rc), {'exit-code': 0})
|
|
||||||
|
|
||||||
@mock.patch('ceph_broker.log')
|
|
||||||
def test_process_requests_missing_api_version(self, mock_log):
|
|
||||||
req = json.dumps({'ops': []})
|
|
||||||
rc = ceph_broker.process_requests(req)
|
|
||||||
self.assertEqual(json.loads(rc), {
|
|
||||||
'exit-code': 1,
|
|
||||||
'stderr': 'Missing or invalid api version (None)'})
|
|
||||||
|
|
||||||
@mock.patch('ceph_broker.log')
|
|
||||||
def test_process_requests_invalid_api_version(self, mock_log):
|
|
||||||
req = json.dumps({'api-version': 2, 'ops': []})
|
|
||||||
rc = ceph_broker.process_requests(req)
|
|
||||||
print("Return: {}".format(rc))
|
|
||||||
self.assertEqual(json.loads(rc),
|
|
||||||
{'exit-code': 1,
|
|
||||||
'stderr': 'Missing or invalid api version (2)'})
|
|
||||||
|
|
||||||
@mock.patch('ceph_broker.log')
|
|
||||||
def test_process_requests_invalid(self, mock_log):
|
|
||||||
reqs = json.dumps({'api-version': 1, 'ops': [{'op': 'invalid_op'}]})
|
|
||||||
rc = ceph_broker.process_requests(reqs)
|
|
||||||
self.assertEqual(json.loads(rc),
|
|
||||||
{'exit-code': 1,
|
|
||||||
'stderr': "Unknown operation 'invalid_op'"})
|
|
||||||
|
|
||||||
@mock.patch('ceph_broker.get_osds')
|
|
||||||
@mock.patch('ceph_broker.ReplicatedPool')
|
|
||||||
@mock.patch('ceph_broker.pool_exists')
|
|
||||||
@mock.patch('ceph_broker.log')
|
|
||||||
def test_process_requests_create_pool_w_pg_num(self, mock_log,
|
|
||||||
mock_pool_exists,
|
|
||||||
mock_replicated_pool,
|
|
||||||
mock_get_osds):
|
|
||||||
mock_get_osds.return_value = [0, 1, 2]
|
|
||||||
mock_pool_exists.return_value = False
|
|
||||||
reqs = json.dumps({'api-version': 1,
|
|
||||||
'ops': [{
|
|
||||||
'op': 'create-pool',
|
|
||||||
'name': 'foo',
|
|
||||||
'replicas': 3,
|
|
||||||
'pg_num': 100}]})
|
|
||||||
rc = ceph_broker.process_requests(reqs)
|
|
||||||
mock_pool_exists.assert_called_with(service='admin', name='foo')
|
|
||||||
mock_replicated_pool.assert_called_with(service='admin', name='foo',
|
|
||||||
replicas=3, pg_num=100)
|
|
||||||
self.assertEqual(json.loads(rc), {'exit-code': 0})
|
|
||||||
|
|
||||||
@mock.patch('ceph_broker.get_osds')
|
|
||||||
@mock.patch('ceph_broker.ReplicatedPool')
|
|
||||||
@mock.patch('ceph_broker.pool_exists')
|
|
||||||
@mock.patch('ceph_broker.log')
|
|
||||||
def test_process_requests_create_pool_w_pg_num_capped(self, mock_log,
|
|
||||||
mock_pool_exists,
|
|
||||||
mock_replicated_pool,
|
|
||||||
mock_get_osds):
|
|
||||||
mock_get_osds.return_value = [0, 1, 2]
|
|
||||||
mock_pool_exists.return_value = False
|
|
||||||
reqs = json.dumps({'api-version': 1,
|
|
||||||
'ops': [{
|
|
||||||
'op': 'create-pool',
|
|
||||||
'name': 'foo',
|
|
||||||
'replicas': 3,
|
|
||||||
'pg_num': 300}]})
|
|
||||||
rc = ceph_broker.process_requests(reqs)
|
|
||||||
mock_pool_exists.assert_called_with(service='admin',
|
|
||||||
name='foo')
|
|
||||||
mock_replicated_pool.assert_called_with(service='admin', name='foo',
|
|
||||||
replicas=3, pg_num=100)
|
|
||||||
self.assertEqual(json.loads(rc), {'exit-code': 0})
|
|
||||||
self.assertEqual(json.loads(rc), {'exit-code': 0})
|
|
||||||
|
|
||||||
@mock.patch('ceph_broker.ReplicatedPool')
|
|
||||||
@mock.patch('ceph_broker.pool_exists')
|
|
||||||
@mock.patch('ceph_broker.log')
|
|
||||||
def test_process_requests_create_pool_exists(self, mock_log,
|
|
||||||
mock_pool_exists,
|
|
||||||
mock_replicated_pool):
|
|
||||||
mock_pool_exists.return_value = True
|
|
||||||
reqs = json.dumps({'api-version': 1,
|
|
||||||
'ops': [{'op': 'create-pool',
|
|
||||||
'name': 'foo',
|
|
||||||
'replicas': 3}]})
|
|
||||||
rc = ceph_broker.process_requests(reqs)
|
|
||||||
mock_pool_exists.assert_called_with(service='admin',
|
|
||||||
name='foo')
|
|
||||||
self.assertFalse(mock_replicated_pool.create.called)
|
|
||||||
self.assertEqual(json.loads(rc), {'exit-code': 0})
|
|
||||||
|
|
||||||
@mock.patch('ceph_broker.ReplicatedPool')
|
|
||||||
@mock.patch('ceph_broker.pool_exists')
|
|
||||||
@mock.patch('ceph_broker.log')
|
|
||||||
def test_process_requests_create_pool_rid(self, mock_log,
|
|
||||||
mock_pool_exists,
|
|
||||||
mock_replicated_pool):
|
|
||||||
mock_pool_exists.return_value = False
|
|
||||||
reqs = json.dumps({'api-version': 1,
|
|
||||||
'request-id': '1ef5aede',
|
|
||||||
'ops': [{
|
|
||||||
'op': 'create-pool',
|
|
||||||
'name': 'foo',
|
|
||||||
'replicas': 3}]})
|
|
||||||
rc = ceph_broker.process_requests(reqs)
|
|
||||||
mock_pool_exists.assert_called_with(service='admin', name='foo')
|
|
||||||
mock_replicated_pool.assert_called_with(service='admin',
|
|
||||||
name='foo',
|
|
||||||
replicas=3)
|
|
||||||
self.assertEqual(json.loads(rc)['exit-code'], 0)
|
|
||||||
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
|
|
||||||
|
|
||||||
@mock.patch('ceph_broker.log')
|
|
||||||
def test_process_requests_invalid_api_rid(self, mock_log):
|
|
||||||
reqs = json.dumps({'api-version': 0, 'request-id': '1ef5aede',
|
|
||||||
'ops': [{'op': 'create-pool'}]})
|
|
||||||
rc = ceph_broker.process_requests(reqs)
|
|
||||||
self.assertEqual(json.loads(rc)['exit-code'], 1)
|
|
||||||
self.assertEqual(json.loads(rc)['stderr'],
|
|
||||||
"Missing or invalid api version (0)")
|
|
||||||
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
|
|
Loading…
Reference in New Issue
Block a user