[hopem,r=]

Implemeted Ceph broker support for ceph-client
relation.
This commit is contained in:
Edward Hope-Morley
2014-11-07 15:47:00 +01:00
parent ea938dbdb5
commit 18b7bebcc2
15 changed files with 188 additions and 56 deletions

View File

@@ -8,7 +8,6 @@ from functools import partial
from charmhelpers.core.hookenv import unit_get from charmhelpers.core.hookenv import unit_get
from charmhelpers.fetch import apt_install from charmhelpers.fetch import apt_install
from charmhelpers.core.hookenv import ( from charmhelpers.core.hookenv import (
WARNING,
ERROR, ERROR,
log log
) )
@@ -175,7 +174,6 @@ def format_ipv6_addr(address):
if is_ipv6(address): if is_ipv6(address):
address = "[%s]" % address address = "[%s]" % address
else: else:
log("Not a valid ipv6 address: %s" % address, level=WARNING)
address = None address = None
return address return address

View File

@@ -15,6 +15,7 @@ from charmhelpers.fetch import (
from charmhelpers.core.hookenv import ( from charmhelpers.core.hookenv import (
config, config,
is_relation_made,
local_unit, local_unit,
log, log,
relation_get, relation_get,
@@ -24,7 +25,7 @@ from charmhelpers.core.hookenv import (
unit_get, unit_get,
unit_private_ip, unit_private_ip,
ERROR, ERROR,
INFO DEBUG
) )
from charmhelpers.core.host import ( from charmhelpers.core.host import (
@@ -57,8 +58,9 @@ from charmhelpers.contrib.network.ip import (
is_address_in_network is_address_in_network
) )
from charmhelpers.contrib.openstack.utils import get_host_ip from charmhelpers.contrib.openstack.utils import (
get_host_ip,
)
CA_CERT_PATH = '/usr/local/share/ca-certificates/keystone_juju_ca_cert.crt' CA_CERT_PATH = '/usr/local/share/ca-certificates/keystone_juju_ca_cert.crt'
@@ -700,6 +702,7 @@ class NeutronContext(OSContextGenerator):
self.network_manager) self.network_manager)
n1kv_config = neutron_plugin_attribute(self.plugin, 'config', n1kv_config = neutron_plugin_attribute(self.plugin, 'config',
self.network_manager) self.network_manager)
n1kv_user_config_flags = config('n1kv-config-flags')
n1kv_ctxt = { n1kv_ctxt = {
'core_plugin': driver, 'core_plugin': driver,
'neutron_plugin': 'n1kv', 'neutron_plugin': 'n1kv',
@@ -710,11 +713,29 @@ class NeutronContext(OSContextGenerator):
'vsm_username': config('n1kv-vsm-username'), 'vsm_username': config('n1kv-vsm-username'),
'vsm_password': config('n1kv-vsm-password'), 'vsm_password': config('n1kv-vsm-password'),
'restrict_policy_profiles': config( 'restrict_policy_profiles': config(
'n1kv_restrict_policy_profiles'), 'n1kv-restrict-policy-profiles'),
} }
if n1kv_user_config_flags:
flags = config_flags_parser(n1kv_user_config_flags)
n1kv_ctxt['user_config_flags'] = flags
return n1kv_ctxt return n1kv_ctxt
def calico_ctxt(self):
driver = neutron_plugin_attribute(self.plugin, 'driver',
self.network_manager)
config = neutron_plugin_attribute(self.plugin, 'config',
self.network_manager)
calico_ctxt = {
'core_plugin': driver,
'neutron_plugin': 'Calico',
'neutron_security_groups': self.neutron_security_groups,
'local_ip': unit_private_ip(),
'config': config
}
return calico_ctxt
def neutron_ctxt(self): def neutron_ctxt(self):
if https(): if https():
proto = 'https' proto = 'https'
@@ -748,6 +769,8 @@ class NeutronContext(OSContextGenerator):
ctxt.update(self.nvp_ctxt()) ctxt.update(self.nvp_ctxt())
elif self.plugin == 'n1kv': elif self.plugin == 'n1kv':
ctxt.update(self.n1kv_ctxt()) ctxt.update(self.n1kv_ctxt())
elif self.plugin == 'Calico':
ctxt.update(self.calico_ctxt())
alchemy_flags = config('neutron-alchemy-flags') alchemy_flags = config('neutron-alchemy-flags')
if alchemy_flags: if alchemy_flags:
@@ -867,7 +890,7 @@ class SubordinateConfigContext(OSContextGenerator):
else: else:
ctxt[k] = v ctxt[k] = v
log("%d section(s) found" % (len(ctxt['sections'])), level=INFO) log("%d section(s) found" % (len(ctxt['sections'])), level=DEBUG)
return ctxt return ctxt
@@ -922,3 +945,34 @@ class WorkerConfigContext(OSContextGenerator):
"workers": self.num_cpus * multiplier "workers": self.num_cpus * multiplier
} }
return ctxt return ctxt
class ZeroMQContext(OSContextGenerator):
interfaces = ['zeromq-configuration']
def __call__(self):
ctxt = {}
if is_relation_made('zeromq-configuration', 'host'):
for rid in relation_ids('zeromq-configuration'):
for unit in related_units(rid):
ctxt['zmq_nonce'] = relation_get('nonce', unit, rid)
ctxt['zmq_host'] = relation_get('host', unit, rid)
return ctxt
class NotificationDriverContext(OSContextGenerator):
def __init__(self, zmq_relation='zeromq-configuration', amqp_relation='amqp'):
"""
:param zmq_relation : Name of Zeromq relation to check
"""
self.zmq_relation = zmq_relation
self.amqp_relation = amqp_relation
def __call__(self):
ctxt = {
'notifications': 'False',
}
if is_relation_made(self.amqp_relation):
ctxt['notifications'] = "True"
return ctxt

View File

@@ -138,10 +138,25 @@ def neutron_plugins():
relation_prefix='neutron', relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)], ssl_dir=NEUTRON_CONF_DIR)],
'services': [], 'services': [],
'packages': [['neutron-plugin-cisco']], 'packages': [[headers_package()] + determine_dkms_package(),
['neutron-plugin-cisco']],
'server_packages': ['neutron-server', 'server_packages': ['neutron-server',
'neutron-plugin-cisco'], 'neutron-plugin-cisco'],
'server_services': ['neutron-server'] 'server_services': ['neutron-server']
},
'Calico': {
'config': '/etc/neutron/plugins/ml2/ml2_conf.ini',
'driver': 'neutron.plugins.ml2.plugin.Ml2Plugin',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'services': ['calico-compute', 'bird', 'neutron-dhcp-agent'],
'packages': [[headers_package()] + determine_dkms_package(),
['calico-compute', 'bird', 'neutron-dhcp-agent']],
'server_packages': ['neutron-server', 'calico-control'],
'server_services': ['neutron-server']
} }
} }
if release >= 'icehouse': if release >= 'icehouse':

View File

@@ -2,6 +2,7 @@
# Common python helper functions used for OpenStack charms. # Common python helper functions used for OpenStack charms.
from collections import OrderedDict from collections import OrderedDict
from functools import wraps
import subprocess import subprocess
import json import json
@@ -468,6 +469,14 @@ def get_hostname(address, fqdn=True):
return result.split('.')[0] return result.split('.')[0]
def get_matchmaker_map(mm_file='/etc/oslo/matchmaker_ring.json'):
mm_map = {}
if os.path.isfile(mm_file):
with open(mm_file, 'r') as f:
mm_map = json.load(f)
return mm_map
def sync_db_with_multi_ipv6_addresses(database, database_user, def sync_db_with_multi_ipv6_addresses(database, database_user,
relation_prefix=None): relation_prefix=None):
hosts = get_ipv6_addr(dynamic_only=False) hosts = get_ipv6_addr(dynamic_only=False)
@@ -484,3 +493,18 @@ def sync_db_with_multi_ipv6_addresses(database, database_user,
for rid in relation_ids('shared-db'): for rid in relation_ids('shared-db'):
relation_set(relation_id=rid, **kwargs) relation_set(relation_id=rid, **kwargs)
def os_requires_version(ostack_release, pkg):
"""
Decorator for hook to specify minimum supported release
"""
def wrap(f):
@wraps(f)
def wrapped_f(*args):
if os_release(pkg) < ostack_release:
raise Exception("This hook is not supported on releases"
" before %s" % ostack_release)
f(*args)
return wrapped_f
return wrap

View File

@@ -113,7 +113,7 @@ def get_osds(service):
return None return None
def create_pool(service, name, replicas=2): def create_pool(service, name, replicas=3):
''' Create a new RADOS pool ''' ''' Create a new RADOS pool '''
if pool_exists(service, name): if pool_exists(service, name):
log("Ceph pool {} already exists, skipping creation".format(name), log("Ceph pool {} already exists, skipping creation".format(name),
@@ -300,7 +300,8 @@ def copy_files(src, dst, symlinks=False, ignore=None):
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point, def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
blk_device, fstype, system_services=[]): blk_device, fstype, system_services=[],
replicas=3):
""" """
NOTE: This function must only be called from a single service unit for NOTE: This function must only be called from a single service unit for
the same rbd_img otherwise data loss will occur. the same rbd_img otherwise data loss will occur.
@@ -317,7 +318,7 @@ def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
# Ensure pool, RBD image, RBD mappings are in place. # Ensure pool, RBD image, RBD mappings are in place.
if not pool_exists(service, pool): if not pool_exists(service, pool):
log('ceph: Creating new pool {}.'.format(pool)) log('ceph: Creating new pool {}.'.format(pool))
create_pool(service, pool) create_pool(service, pool, replicas=replicas)
if not rbd_exists(service, pool, rbd_img): if not rbd_exists(service, pool, rbd_img):
log('ceph: Creating RBD image ({}).'.format(rbd_img)) log('ceph: Creating RBD image ({}).'.format(rbd_img))

View File

@@ -214,6 +214,12 @@ class Config(dict):
except KeyError: except KeyError:
return (self._prev_dict or {})[key] return (self._prev_dict or {})[key]
def keys(self):
prev_keys = []
if self._prev_dict is not None:
prev_keys = self._prev_dict.keys()
return list(set(prev_keys + dict.keys(self)))
def load_previous(self, path=None): def load_previous(self, path=None):
"""Load previous copy of config from disk. """Load previous copy of config from disk.

View File

@@ -6,13 +6,13 @@
# Matthew Wedgwood <matthew.wedgwood@canonical.com> # Matthew Wedgwood <matthew.wedgwood@canonical.com>
import os import os
import re
import pwd import pwd
import grp import grp
import random import random
import string import string
import subprocess import subprocess
import hashlib import hashlib
import shutil
from contextlib import contextmanager from contextlib import contextmanager
from collections import OrderedDict from collections import OrderedDict
@@ -317,7 +317,13 @@ def list_nics(nic_type):
ip_output = (line for line in ip_output if line) ip_output = (line for line in ip_output if line)
for line in ip_output: for line in ip_output:
if line.split()[1].startswith(int_type): if line.split()[1].startswith(int_type):
interfaces.append(line.split()[1].replace(":", "")) matched = re.search('.*: (bond[0-9]+\.[0-9]+)@.*', line)
if matched:
interface = matched.groups()[0]
else:
interface = line.split()[1].replace(":", "")
interfaces.append(interface)
return interfaces return interfaces

View File

@@ -1,2 +1,2 @@
from .base import * from .base import * # NOQA
from .helpers import * from .helpers import * # NOQA

View File

@@ -72,6 +72,7 @@ CLOUD_ARCHIVE_POCKETS = {
FETCH_HANDLERS = ( FETCH_HANDLERS = (
'charmhelpers.fetch.archiveurl.ArchiveUrlFetchHandler', 'charmhelpers.fetch.archiveurl.ArchiveUrlFetchHandler',
'charmhelpers.fetch.bzrurl.BzrUrlFetchHandler', 'charmhelpers.fetch.bzrurl.BzrUrlFetchHandler',
'charmhelpers.fetch.giturl.GitUrlFetchHandler',
) )
APT_NO_LOCK = 100 # The return code for "couldn't acquire lock" in APT. APT_NO_LOCK = 100 # The return code for "couldn't acquire lock" in APT.
@@ -218,6 +219,7 @@ def add_source(source, key=None):
pocket for the release. pocket for the release.
'cloud:' may be used to activate official cloud archive pockets, 'cloud:' may be used to activate official cloud archive pockets,
such as 'cloud:icehouse' such as 'cloud:icehouse'
'distro' may be used as a noop
@param key: A key to be added to the system's APT keyring and used @param key: A key to be added to the system's APT keyring and used
to verify the signatures on packages. Ideally, this should be an to verify the signatures on packages. Ideally, this should be an
@@ -251,8 +253,10 @@ def add_source(source, key=None):
release = lsb_release()['DISTRIB_CODENAME'] release = lsb_release()['DISTRIB_CODENAME']
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt: with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(PROPOSED_POCKET.format(release)) apt.write(PROPOSED_POCKET.format(release))
elif source == 'distro':
pass
else: else:
raise SourceConfigError("Unknown source: {!r}".format(source)) log("Unknown source: {!r}".format(source))
if key: if key:
if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in key: if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in key:

View File

@@ -0,0 +1,44 @@
import os
from charmhelpers.fetch import (
BaseFetchHandler,
UnhandledSource
)
from charmhelpers.core.host import mkdir
try:
from git import Repo
except ImportError:
from charmhelpers.fetch import apt_install
apt_install("python-git")
from git import Repo
class GitUrlFetchHandler(BaseFetchHandler):
"""Handler for git branches via generic and github URLs"""
def can_handle(self, source):
url_parts = self.parse_url(source)
#TODO (mattyw) no support for ssh git@ yet
if url_parts.scheme not in ('http', 'https', 'git'):
return False
else:
return True
def clone(self, source, dest, branch):
if not self.can_handle(source):
raise UnhandledSource("Cannot handle {}".format(source))
repo = Repo.clone_from(source, dest)
repo.git.checkout(branch)
def install(self, source, branch="master"):
url_parts = self.parse_url(source)
branch_name = url_parts.path.strip("/").split("/")[-1]
dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched",
branch_name)
if not os.path.exists(dest_dir):
mkdir(dest_dir, perms=0755)
try:
self.clone(source, dest_dir, branch)
except OSError as e:
raise UnhandledSource(e.strerror)
return dest_dir

View File

@@ -1,5 +1,5 @@
#!/usr/bin/python #!/usr/bin/python
import json
import os import os
import sys import sys
import uuid import uuid
@@ -9,7 +9,6 @@ from subprocess import check_call
from cinder_utils import ( from cinder_utils import (
determine_packages, determine_packages,
do_openstack_upgrade, do_openstack_upgrade,
ensure_ceph_pool,
juju_log, juju_log,
migrate_database, migrate_database,
configure_lvm_storage, configure_lvm_storage,
@@ -37,6 +36,7 @@ from charmhelpers.core.hookenv import (
unit_get, unit_get,
log, log,
ERROR, ERROR,
INFO
) )
from charmhelpers.fetch import ( from charmhelpers.fetch import (
@@ -254,23 +254,34 @@ def ceph_joined():
@hooks.hook('ceph-relation-changed') @hooks.hook('ceph-relation-changed')
@restart_on_change(restart_map()) @restart_on_change(restart_map())
def ceph_changed(): def ceph_changed(relation_id=None):
if 'ceph' not in CONFIGS.complete_contexts(): if 'ceph' not in CONFIGS.complete_contexts():
juju_log('ceph relation incomplete. Peer not ready?') juju_log('ceph relation incomplete. Peer not ready?')
return return
settings = relation_get(rid=relation_id)
svc = service_name() svc = service_name()
if not ensure_ceph_keyring(service=svc, if not ensure_ceph_keyring(service=svc,
user='cinder', group='cinder'): user='cinder', group='cinder'):
juju_log('Could not create ceph keyring: peer not ready?') juju_log('Could not create ceph keyring: peer not ready?')
return return
if 'broker_rsp' in settings:
rsp = settings['broker_rsp']
if not rsp:
log("Ceph broker request failed (rsp=%s)" % (rsp), level=ERROR)
return
else:
log("Ceph broker request succeeded (rsp=%s)" % (rsp), level=INFO)
else:
broker_ops = [{'op': 'create_pool', 'pool': svc,
'replicas': config('ceph-osd-replication-count')}]
for rid in relation_ids('ceph'):
relation_set(relation_id=rid, broker_req=json.dumps(broker_ops))
log("Request sent to Ceph broker (rid=%s)" % (rid))
set_ceph_env_variables(service=svc)
CONFIGS.write(CINDER_CONF) CONFIGS.write(CINDER_CONF)
CONFIGS.write(ceph_config_file()) CONFIGS.write(ceph_config_file())
set_ceph_env_variables(service=svc)
if eligible_leader(CLUSTER_RES):
_config = config()
ensure_ceph_pool(service=svc,
replicas=_config['ceph-osd-replication-count'])
@hooks.hook('cluster-relation-joined') @hooks.hook('cluster-relation-joined')

View File

@@ -27,11 +27,6 @@ from charmhelpers.core.host import (
lsb_release lsb_release
) )
from charmhelpers.contrib.storage.linux.ceph import (
create_pool as ceph_create_pool,
pool_exists as ceph_pool_exists,
)
from charmhelpers.contrib.openstack.alternatives import install_alternative from charmhelpers.contrib.openstack.alternatives import install_alternative
from charmhelpers.contrib.hahelpers.cluster import ( from charmhelpers.contrib.hahelpers.cluster import (
eligible_leader, eligible_leader,
@@ -397,13 +392,6 @@ def migrate_database():
subprocess.check_call(cmd) subprocess.check_call(cmd)
def ensure_ceph_pool(service, replicas):
'Creates a ceph pool for service if one does not exist'
# TODO(Ditto about moving somewhere sharable)
if not ceph_pool_exists(service=service, name=service):
ceph_create_pool(service=service, name=service, replicas=replicas)
def set_ceph_env_variables(service): def set_ceph_env_variables(service):
# XXX: Horrid kludge to make cinder-volume use # XXX: Horrid kludge to make cinder-volume use
# a different ceph username than admin # a different ceph username than admin

View File

@@ -31,7 +31,6 @@ TO_PATCH = [
'determine_packages', 'determine_packages',
'do_openstack_upgrade', 'do_openstack_upgrade',
'ensure_ceph_keyring', 'ensure_ceph_keyring',
'ensure_ceph_pool',
'juju_log', 'juju_log',
'log', 'log',
'lsb_release', 'lsb_release',
@@ -372,7 +371,6 @@ class TestJoinedHooks(CharmTestCase):
self.ensure_ceph_keyring.assert_called_with(service='cinder', self.ensure_ceph_keyring.assert_called_with(service='cinder',
user='cinder', user='cinder',
group='cinder') group='cinder')
self.ensure_ceph_pool.assert_called_with(service='cinder', replicas=3)
for c in [call('/var/lib/charm/cinder/ceph.conf'), for c in [call('/var/lib/charm/cinder/ceph.conf'),
call('/etc/cinder/cinder.conf')]: call('/etc/cinder/cinder.conf')]:
self.assertIn(c, self.CONFIGS.write.call_args_list) self.assertIn(c, self.CONFIGS.write.call_args_list)
@@ -395,7 +393,6 @@ class TestJoinedHooks(CharmTestCase):
self.service_name.return_value = 'cinder' self.service_name.return_value = 'cinder'
self.ensure_ceph_keyring.return_value = True self.ensure_ceph_keyring.return_value = True
hooks.hooks.execute(['hooks/ceph-relation-changed']) hooks.hooks.execute(['hooks/ceph-relation-changed'])
self.assertFalse(self.ensure_ceph_pool.called)
class TestDepartedHooks(CharmTestCase): class TestDepartedHooks(CharmTestCase):

View File

@@ -19,8 +19,6 @@ TO_PATCH = [
'umount', 'umount',
'mkdir', 'mkdir',
# ceph utils # ceph utils
'ceph_create_pool',
'ceph_pool_exists',
# storage_utils # storage_utils
'create_lvm_physical_volume', 'create_lvm_physical_volume',
'create_lvm_volume_group', 'create_lvm_volume_group',
@@ -351,18 +349,6 @@ class TestCinderUtils(CharmTestCase):
cinder_utils.migrate_database() cinder_utils.migrate_database()
check_call.assert_called_with(['cinder-manage', 'db', 'sync']) check_call.assert_called_with(['cinder-manage', 'db', 'sync'])
def test_ensure_ceph_pool(self):
self.ceph_pool_exists.return_value = False
cinder_utils.ensure_ceph_pool(service='cinder', replicas=3)
self.ceph_create_pool.assert_called_with(service='cinder',
name='cinder',
replicas=3)
def test_ensure_ceph_pool_already_exists(self):
self.ceph_pool_exists.return_value = True
cinder_utils.ensure_ceph_pool(service='cinder', replicas=3)
self.assertFalse(self.ceph_create_pool.called)
@patch('os.path.exists') @patch('os.path.exists')
def test_register_configs_apache(self, exists): def test_register_configs_apache(self, exists):
exists.return_value = False exists.return_value = False

View File

@@ -26,8 +26,6 @@ from test_utils import (
TO_PATCH = [ TO_PATCH = [
# cinder_utils # cinder_utils
'determine_packages', 'determine_packages',
'ensure_ceph_keyring',
'ensure_ceph_pool',
'juju_log', 'juju_log',
'lsb_release', 'lsb_release',
'migrate_database', 'migrate_database',