charm-ceph-proxy/hooks/utils.py
Alex Kavanagh ddd26acc6d Convert the charm to Python3
* Move charmhelpers to the root of the charm
* sync charmhelpers to latest version

Change-Id: Id0b838f0206635cf912d205f2fb6fda7b31d0dfe
2019-03-10 14:16:41 +00:00

150 lines
3.7 KiB
Python

#
# Copyright 2012 Canonical Ltd.
#
# Authors:
# James Page <james.page@ubuntu.com>
# Paul Collins <paul.collins@canonical.com>
#
import socket
import re
from charmhelpers.core.hookenv import (
unit_get,
cached,
config,
status_set,
network_get_primary_address,
log,
DEBUG,
)
from charmhelpers.fetch import (
apt_install,
filter_installed_packages,
)
from charmhelpers.core.host import (
lsb_release,
CompareHostReleases,
)
from charmhelpers.contrib.network.ip import (
get_address_in_network,
get_ipv6_addr,
)
try:
import dns.resolver
except ImportError:
apt_install(filter_installed_packages(['python-dnspython']),
fatal=True)
import dns.resolver
def enable_pocket(pocket):
apt_sources = "/etc/apt/sources.list"
with open(apt_sources, "rt") as sources:
lines = sources.readlines()
with open(apt_sources, "wt") as sources:
for line in lines:
if pocket in line:
sources.write(re.sub('^# deb', 'deb', line))
else:
sources.write(line)
@cached
def get_unit_hostname():
return socket.gethostname()
@cached
def get_host_ip(hostname=None):
if config('prefer-ipv6'):
return get_ipv6_addr()[0]
hostname = hostname or unit_get('private-address')
try:
# Test to see if already an IPv4 address
socket.inet_aton(hostname)
return hostname
except socket.error:
# This may throw an NXDOMAIN exception; in which case
# things are badly broken so just let it kill the hook
answers = dns.resolver.query(hostname, 'A')
if answers:
return answers[0].address
@cached
def get_public_addr():
if config('ceph-public-network'):
return get_network_addrs('ceph-public-network')[0]
try:
return network_get_primary_address('public')
except NotImplementedError:
log("network-get not supported", DEBUG)
return get_host_ip()
@cached
def get_cluster_addr():
if config('ceph-cluster-network'):
return get_network_addrs('ceph-cluster-network')[0]
try:
return network_get_primary_address('cluster')
except NotImplementedError:
log("network-get not supported", DEBUG)
return get_host_ip()
def get_networks(config_opt='ceph-public-network'):
"""Get all configured networks from provided config option.
If public network(s) are provided, go through them and return those for
which we have an address configured.
"""
networks = config(config_opt)
if networks:
networks = networks.split()
return [n for n in networks if get_address_in_network(n)]
return []
def get_network_addrs(config_opt):
"""Get all configured public networks addresses.
If public network(s) are provided, go through them and return the
addresses we have configured on any of those networks.
"""
addrs = []
networks = config(config_opt)
if networks:
networks = networks.split()
addrs = [get_address_in_network(n) for n in networks]
addrs = [a for a in addrs if a]
if not addrs:
if networks:
msg = ("Could not find an address on any of '%s' - resolve this "
"error to retry" % (networks))
status_set('blocked', msg)
raise Exception(msg)
else:
return [get_host_ip()]
return addrs
def assert_charm_supports_ipv6():
"""Check whether we are able to support charms ipv6."""
_release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(_release) < "trusty":
raise Exception("IPv6 is not supported in the charms for Ubuntu "
"versions less than Trusty 14.04")