[dosaboy,r=james-page] Add broker functionality

This commit is contained in:
James Page 2014-11-19 16:12:04 -06:00
commit ad7867fe77
14 changed files with 606 additions and 17 deletions

7
.coveragerc Normal file
View File

@ -0,0 +1,7 @@
[report]
# Regexes for lines to exclude from consideration
exclude_lines =
if __name__ == .__main__.:
include=
hooks/hooks.py
hooks/ceph*.py

View File

@ -2,9 +2,13 @@
PYTHON := /usr/bin/env python
lint:
@flake8 --exclude hooks/charmhelpers hooks tests
@flake8 --exclude hooks/charmhelpers hooks tests unit_tests
@charm proof
unit_test:
@echo Starting unit tests...
@$(PYTHON) /usr/bin/nosetests --nologcapture --with-coverage unit_tests
test:
@echo Starting Amulet tests...
# coreycb note: The -v should only be temporary until Amulet sends

View File

@ -5,6 +5,7 @@ include:
- fetch
- contrib.storage.linux:
- utils
- ceph
- payload.execd
- contrib.openstack.alternatives
- contrib.network.ip

0
hooks/__init__.py Normal file
View File

90
hooks/ceph_broker.py Normal file
View File

@ -0,0 +1,90 @@
#!/usr/bin/python
#
# Copyright 2014 Canonical Ltd.
#
import json
from charmhelpers.core.hookenv import (
log,
DEBUG,
INFO,
ERROR,
)
from charmhelpers.contrib.storage.linux.ceph import (
create_pool,
pool_exists,
)
def decode_req_encode_rsp(f):
"""Decorator to decode incoming requests and encode responses."""
def decode_inner(req):
return json.dumps(f(json.loads(req)))
return decode_inner
@decode_req_encode_rsp
def process_requests(reqs):
"""Process Ceph broker request(s).
This is a versioned api. API version must be supplied by the client making
the request.
"""
try:
version = reqs.get('api-version')
if version == 1:
return process_requests_v1(reqs['ops'])
except Exception as exc:
log(str(exc), level=ERROR)
msg = ("Unexpected error occurred while processing requests: %s" %
(reqs))
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
msg = ("Missing or invalid api version (%s)" % (version))
return {'exit-code': 1, 'stderr': msg}
def process_requests_v1(reqs):
"""Process v1 requests.
Takes a list of requests (dicts) and processes each one. If an error is
found, processing stops and the client is notified in the response.
Returns a response dict containing the exit code (non-zero if any
operation failed along with an explanation).
"""
log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
for req in reqs:
op = req.get('op')
log("Processing op='%s'" % (op), level=DEBUG)
# Use admin client since we do not have other client key locations
# setup to use them for these operations.
svc = 'admin'
if op == "create-pool":
params = {'pool': req.get('name'),
'replicas': req.get('replicas')}
if not all(params.iteritems()):
msg = ("Missing parameter(s): %s" %
(' '.join([k for k in params.iterkeys()
if not params[k]])))
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
pool = params['pool']
replicas = params['replicas']
if not pool_exists(service=svc, name=pool):
log("Creating pool '%s' (replicas=%s)" % (pool, replicas),
level=INFO)
create_pool(service=svc, name=pool, replicas=replicas)
else:
log("Pool '%s' already exists - skipping create" % (pool),
level=DEBUG)
else:
msg = "Unknown operation '%s'" % (op)
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
return {'exit-code': 0}

View File

@ -8,7 +8,6 @@ from functools import partial
from charmhelpers.core.hookenv import unit_get
from charmhelpers.fetch import apt_install
from charmhelpers.core.hookenv import (
WARNING,
ERROR,
log
)
@ -175,7 +174,6 @@ def format_ipv6_addr(address):
if is_ipv6(address):
address = "[%s]" % address
else:
log("Not a valid ipv6 address: %s" % address, level=WARNING)
address = None
return address

View File

@ -0,0 +1,388 @@
#
# Copyright 2012 Canonical Ltd.
#
# This file is sourced from lp:openstack-charm-helpers
#
# Authors:
# James Page <james.page@ubuntu.com>
# Adam Gandelman <adamg@ubuntu.com>
#
import os
import shutil
import json
import time
from subprocess import (
check_call,
check_output,
CalledProcessError
)
from charmhelpers.core.hookenv import (
relation_get,
relation_ids,
related_units,
log,
INFO,
WARNING,
ERROR
)
from charmhelpers.core.host import (
mount,
mounts,
service_start,
service_stop,
service_running,
umount,
)
from charmhelpers.fetch import (
apt_install,
)
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
KEYFILE = '/etc/ceph/ceph.client.{}.key'
CEPH_CONF = """[global]
auth supported = {auth}
keyring = {keyring}
mon host = {mon_hosts}
log to syslog = {use_syslog}
err to syslog = {use_syslog}
clog to syslog = {use_syslog}
"""
def install():
''' Basic Ceph client installation '''
ceph_dir = "/etc/ceph"
if not os.path.exists(ceph_dir):
os.mkdir(ceph_dir)
apt_install('ceph-common', fatal=True)
def rbd_exists(service, pool, rbd_img):
''' Check to see if a RADOS block device exists '''
try:
out = check_output(['rbd', 'list', '--id', service,
'--pool', pool])
except CalledProcessError:
return False
else:
return rbd_img in out
def create_rbd_image(service, pool, image, sizemb):
''' Create a new RADOS block device '''
cmd = [
'rbd',
'create',
image,
'--size',
str(sizemb),
'--id',
service,
'--pool',
pool
]
check_call(cmd)
def pool_exists(service, name):
''' Check to see if a RADOS pool already exists '''
try:
out = check_output(['rados', '--id', service, 'lspools'])
except CalledProcessError:
return False
else:
return name in out
def get_osds(service):
'''
Return a list of all Ceph Object Storage Daemons
currently in the cluster
'''
version = ceph_version()
if version and version >= '0.56':
return json.loads(check_output(['ceph', '--id', service,
'osd', 'ls', '--format=json']))
else:
return None
def create_pool(service, name, replicas=3):
''' Create a new RADOS pool '''
if pool_exists(service, name):
log("Ceph pool {} already exists, skipping creation".format(name),
level=WARNING)
return
# Calculate the number of placement groups based
# on upstream recommended best practices.
osds = get_osds(service)
if osds:
pgnum = (len(osds) * 100 / replicas)
else:
# NOTE(james-page): Default to 200 for older ceph versions
# which don't support OSD query from cli
pgnum = 200
cmd = [
'ceph', '--id', service,
'osd', 'pool', 'create',
name, str(pgnum)
]
check_call(cmd)
cmd = [
'ceph', '--id', service,
'osd', 'pool', 'set', name,
'size', str(replicas)
]
check_call(cmd)
def delete_pool(service, name):
''' Delete a RADOS pool from ceph '''
cmd = [
'ceph', '--id', service,
'osd', 'pool', 'delete',
name, '--yes-i-really-really-mean-it'
]
check_call(cmd)
def _keyfile_path(service):
return KEYFILE.format(service)
def _keyring_path(service):
return KEYRING.format(service)
def create_keyring(service, key):
''' Create a new Ceph keyring containing key'''
keyring = _keyring_path(service)
if os.path.exists(keyring):
log('ceph: Keyring exists at %s.' % keyring, level=WARNING)
return
cmd = [
'ceph-authtool',
keyring,
'--create-keyring',
'--name=client.{}'.format(service),
'--add-key={}'.format(key)
]
check_call(cmd)
log('ceph: Created new ring at %s.' % keyring, level=INFO)
def create_key_file(service, key):
''' Create a file containing key '''
keyfile = _keyfile_path(service)
if os.path.exists(keyfile):
log('ceph: Keyfile exists at %s.' % keyfile, level=WARNING)
return
with open(keyfile, 'w') as fd:
fd.write(key)
log('ceph: Created new keyfile at %s.' % keyfile, level=INFO)
def get_ceph_nodes():
''' Query named relation 'ceph' to detemine current nodes '''
hosts = []
for r_id in relation_ids('ceph'):
for unit in related_units(r_id):
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
return hosts
def configure(service, key, auth, use_syslog):
''' Perform basic configuration of Ceph '''
create_keyring(service, key)
create_key_file(service, key)
hosts = get_ceph_nodes()
with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
ceph_conf.write(CEPH_CONF.format(auth=auth,
keyring=_keyring_path(service),
mon_hosts=",".join(map(str, hosts)),
use_syslog=use_syslog))
modprobe('rbd')
def image_mapped(name):
''' Determine whether a RADOS block device is mapped locally '''
try:
out = check_output(['rbd', 'showmapped'])
except CalledProcessError:
return False
else:
return name in out
def map_block_storage(service, pool, image):
''' Map a RADOS block device for local use '''
cmd = [
'rbd',
'map',
'{}/{}'.format(pool, image),
'--user',
service,
'--secret',
_keyfile_path(service),
]
check_call(cmd)
def filesystem_mounted(fs):
''' Determine whether a filesytems is already mounted '''
return fs in [f for f, m in mounts()]
def make_filesystem(blk_device, fstype='ext4', timeout=10):
''' Make a new filesystem on the specified block device '''
count = 0
e_noent = os.errno.ENOENT
while not os.path.exists(blk_device):
if count >= timeout:
log('ceph: gave up waiting on block device %s' % blk_device,
level=ERROR)
raise IOError(e_noent, os.strerror(e_noent), blk_device)
log('ceph: waiting for block device %s to appear' % blk_device,
level=INFO)
count += 1
time.sleep(1)
else:
log('ceph: Formatting block device %s as filesystem %s.' %
(blk_device, fstype), level=INFO)
check_call(['mkfs', '-t', fstype, blk_device])
def place_data_on_block_device(blk_device, data_src_dst):
''' Migrate data in data_src_dst to blk_device and then remount '''
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
# TODO: re-use
def modprobe(module):
''' Load a kernel module and configure for auto-load on reboot '''
log('ceph: Loading kernel module', level=INFO)
cmd = ['modprobe', module]
check_call(cmd)
with open('/etc/modules', 'r+') as modules:
if module not in modules.read():
modules.write(module)
def copy_files(src, dst, symlinks=False, ignore=None):
''' Copy files from src to dst '''
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
shutil.copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
blk_device, fstype, system_services=[],
replicas=3):
"""
NOTE: This function must only be called from a single service unit for
the same rbd_img otherwise data loss will occur.
Ensures given pool and RBD image exists, is mapped to a block device,
and the device is formatted and mounted at the given mount_point.
If formatting a device for the first time, data existing at mount_point
will be migrated to the RBD device before being re-mounted.
All services listed in system_services will be stopped prior to data
migration and restarted when complete.
"""
# Ensure pool, RBD image, RBD mappings are in place.
if not pool_exists(service, pool):
log('ceph: Creating new pool {}.'.format(pool))
create_pool(service, pool, replicas=replicas)
if not rbd_exists(service, pool, rbd_img):
log('ceph: Creating RBD image ({}).'.format(rbd_img))
create_rbd_image(service, pool, rbd_img, sizemb)
if not image_mapped(rbd_img):
log('ceph: Mapping RBD Image {} as a Block Device.'.format(rbd_img))
map_block_storage(service, pool, rbd_img)
# make file system
# TODO: What happens if for whatever reason this is run again and
# the data is already in the rbd device and/or is mounted??
# When it is mounted already, it will fail to make the fs
# XXX: This is really sketchy! Need to at least add an fstab entry
# otherwise this hook will blow away existing data if its executed
# after a reboot.
if not filesystem_mounted(mount_point):
make_filesystem(blk_device, fstype)
for svc in system_services:
if service_running(svc):
log('ceph: Stopping services {} prior to migrating data.'
.format(svc))
service_stop(svc)
place_data_on_block_device(blk_device, mount_point)
for svc in system_services:
log('ceph: Starting service {} after migrating data.'
.format(svc))
service_start(svc)
def ensure_ceph_keyring(service, user=None, group=None):
'''
Ensures a ceph keyring is created for a named service
and optionally ensures user and group ownership.
Returns False if no ceph key is available in relation state.
'''
key = None
for rid in relation_ids('ceph'):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def ceph_version():
''' Retrieve the local version of ceph '''
if os.path.exists('/usr/bin/ceph'):
cmd = ['ceph', '-v']
output = check_output(cmd)
output = output.split()
if len(output) > 3:
return output[2]
else:
return None
else:
return None

View File

@ -1,2 +1,2 @@
from .base import *
from .helpers import *
from .base import * # NOQA
from .helpers import * # NOQA

View File

@ -256,7 +256,7 @@ def add_source(source, key=None):
elif source == 'distro':
pass
else:
raise SourceConfigError("Unknown source: {!r}".format(source))
log("Unknown source: {!r}".format(source))
if key:
if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in key:

View File

@ -0,0 +1 @@
hooks.py

View File

@ -15,7 +15,9 @@ import sys
import ceph
from charmhelpers.core.hookenv import (
log, ERROR,
log,
DEBUG,
ERROR,
config,
relation_ids,
related_units,
@ -25,7 +27,6 @@ from charmhelpers.core.hookenv import (
Hooks, UnregisteredHookError,
service_name
)
from charmhelpers.core.host import (
service_restart,
umount,
@ -44,12 +45,14 @@ from charmhelpers.contrib.network.ip import (
get_ipv6_addr,
format_ipv6_addr
)
from utils import (
render_template,
get_public_addr,
assert_charm_supports_ipv6
)
from ceph_broker import (
process_requests
)
hooks = Hooks()
@ -215,7 +218,7 @@ def notify_radosgws():
def notify_client():
for relid in relation_ids('client'):
client_relation(relid)
client_relation_joined(relid)
def upgrade_keys():
@ -266,28 +269,46 @@ def radosgw_relation(relid=None):
@hooks.hook('client-relation-joined')
def client_relation(relid=None):
def client_relation_joined(relid=None):
if ceph.is_quorum():
log('mon cluster in quorum - providing client with keys')
service_name = None
if relid is None:
service_name = remote_unit().split('/')[0]
units = [remote_unit()]
service_name = units[0].split('/')[0]
else:
units = related_units(relid)
if len(units) > 0:
service_name = units[0].split('/')[0]
if service_name is not None:
data = {
'key': ceph.get_named_key(service_name),
'auth': config('auth-supported'),
'ceph-public-address': get_public_addr(),
}
data = {'key': ceph.get_named_key(service_name),
'auth': config('auth-supported'),
'ceph-public-address': get_public_addr()}
relation_set(relation_id=relid,
relation_settings=data)
client_relation_changed(relid=relid)
else:
log('mon cluster not in quorum - deferring key provision')
@hooks.hook('client-relation-changed')
def client_relation_changed(relid=None):
"""Process broker requests from ceph client relations."""
if ceph.is_quorum():
settings = relation_get(rid=relid)
if 'broker_req' in settings:
if not ceph.is_leader():
log("Not leader - ignoring broker request", level=DEBUG)
else:
rsp = process_requests(settings['broker_req'])
relation_set(relation_id=relid,
relation_settings={'broker_rsp': rsp})
else:
log('mon cluster not in quorum', level=DEBUG)
@hooks.hook('upgrade-charm')
def upgrade_charm():
emit_cephconf()

5
setup.cfg Normal file
View File

@ -0,0 +1,5 @@
[nosetests]
verbosity=2
with-coverage=1
cover-erase=1
cover-package=hooks

2
unit_tests/__init__.py Normal file
View File

@ -0,0 +1,2 @@
import sys
sys.path.append('hooks')

View File

@ -0,0 +1,72 @@
import json
import mock
import unittest
import ceph_broker
class CephBrokerTestCase(unittest.TestCase):
def setUp(self):
super(CephBrokerTestCase, self).setUp()
@mock.patch('ceph_broker.log')
def test_process_requests_noop(self, mock_log):
req = json.dumps({'api-version': 1, 'ops': []})
rc = ceph_broker.process_requests(req)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
def test_process_requests_missing_api_version(self, mock_log):
req = json.dumps({'ops': []})
rc = ceph_broker.process_requests(req)
self.assertEqual(json.loads(rc), {'exit-code': 1,
'stderr':
('Missing or invalid api version '
'(None)')})
@mock.patch('ceph_broker.log')
def test_process_requests_invalid_api_version(self, mock_log):
req = json.dumps({'api-version': 2, 'ops': []})
rc = ceph_broker.process_requests(req)
self.assertEqual(json.loads(rc),
{'exit-code': 1,
'stderr': 'Missing or invalid api version (2)'})
@mock.patch('ceph_broker.log')
def test_process_requests_invalid(self, mock_log):
reqs = json.dumps({'api-version': 1, 'ops': [{'op': 'invalid_op'}]})
rc = ceph_broker.process_requests(reqs)
self.assertEqual(json.loads(rc),
{'exit-code': 1,
'stderr': "Unknown operation 'invalid_op'"})
@mock.patch('ceph_broker.create_pool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool(self, mock_log, mock_pool_exists,
mock_create_pool):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{'op': 'create-pool', 'name':
'foo', 'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_create_pool.assert_called_with(service='admin', name='foo',
replicas=3)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.create_pool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_exists(self, mock_log,
mock_pool_exists,
mock_create_pool):
mock_pool_exists.return_value = True
reqs = json.dumps({'api-version': 1,
'ops': [{'op': 'create-pool', 'name': 'foo',
'replicas': 3}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
self.assertFalse(mock_create_pool.called)
self.assertEqual(json.loads(rc), {'exit-code': 0})