638 lines
19 KiB
Python
638 lines
19 KiB
Python
#!/usr/bin/python
|
|
import ast
|
|
import pcmk
|
|
import maas
|
|
import os
|
|
import subprocess
|
|
import socket
|
|
import fcntl
|
|
import struct
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from base64 import b64decode
|
|
|
|
from charmhelpers.core.hookenv import (
|
|
local_unit,
|
|
log,
|
|
DEBUG,
|
|
INFO,
|
|
WARNING,
|
|
relation_get,
|
|
related_units,
|
|
relation_ids,
|
|
config,
|
|
unit_get,
|
|
status_set,
|
|
)
|
|
from charmhelpers.contrib.openstack.utils import (
|
|
get_host_ip,
|
|
set_unit_paused,
|
|
clear_unit_paused,
|
|
is_unit_paused_set,
|
|
)
|
|
from charmhelpers.core.host import (
|
|
service_start,
|
|
service_stop,
|
|
service_restart,
|
|
service_running,
|
|
write_file,
|
|
file_hash,
|
|
lsb_release
|
|
)
|
|
from charmhelpers.fetch import (
|
|
apt_install,
|
|
)
|
|
from charmhelpers.contrib.hahelpers.cluster import (
|
|
peer_ips,
|
|
)
|
|
from charmhelpers.contrib.network import ip as utils
|
|
|
|
try:
|
|
import netifaces
|
|
except ImportError:
|
|
apt_install('python-netifaces')
|
|
import netifaces
|
|
|
|
try:
|
|
from netaddr import IPNetwork
|
|
except ImportError:
|
|
apt_install('python-netaddr', fatal=True)
|
|
from netaddr import IPNetwork
|
|
|
|
|
|
try:
|
|
import jinja2
|
|
except ImportError:
|
|
apt_install('python-jinja2', fatal=True)
|
|
import jinja2
|
|
|
|
|
|
TEMPLATES_DIR = 'templates'
|
|
COROSYNC_CONF = '/etc/corosync/corosync.conf'
|
|
COROSYNC_DEFAULT = '/etc/default/corosync'
|
|
COROSYNC_AUTHKEY = '/etc/corosync/authkey'
|
|
COROSYNC_HACLUSTER_ACL = '/etc/corosync/uidgid.d/hacluster'
|
|
COROSYNC_CONF_FILES = [
|
|
COROSYNC_DEFAULT,
|
|
COROSYNC_AUTHKEY,
|
|
COROSYNC_CONF,
|
|
COROSYNC_HACLUSTER_ACL,
|
|
]
|
|
SUPPORTED_TRANSPORTS = ['udp', 'udpu', 'multicast', 'unicast']
|
|
|
|
|
|
def disable_upstart_services(*services):
|
|
for service in services:
|
|
with open("/etc/init/{}.override".format(service), "w") as override:
|
|
override.write("manual")
|
|
|
|
|
|
def enable_upstart_services(*services):
|
|
for service in services:
|
|
path = '/etc/init/{}.override'.format(service)
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
|
|
|
|
def disable_lsb_services(*services):
|
|
for service in services:
|
|
subprocess.check_call(['update-rc.d', '-f', service, 'remove'])
|
|
|
|
|
|
def enable_lsb_services(*services):
|
|
for service in services:
|
|
subprocess.check_call(['update-rc.d', '-f', service, 'defaults'])
|
|
|
|
|
|
def get_iface_ipaddr(iface):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
return socket.inet_ntoa(fcntl.ioctl(
|
|
s.fileno(),
|
|
0x8919, # SIOCGIFADDR
|
|
struct.pack('256s', iface[:15])
|
|
)[20:24])
|
|
|
|
|
|
def get_iface_netmask(iface):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
return socket.inet_ntoa(fcntl.ioctl(
|
|
s.fileno(),
|
|
0x891b, # SIOCGIFNETMASK
|
|
struct.pack('256s', iface[:15])
|
|
)[20:24])
|
|
|
|
|
|
def get_netmask_cidr(netmask):
|
|
netmask = netmask.split('.')
|
|
binary_str = ''
|
|
for octet in netmask:
|
|
binary_str += bin(int(octet))[2:].zfill(8)
|
|
return str(len(binary_str.rstrip('0')))
|
|
|
|
|
|
def get_network_address(iface):
|
|
if iface:
|
|
iface = str(iface)
|
|
network = "{}/{}".format(get_iface_ipaddr(iface),
|
|
get_netmask_cidr(get_iface_netmask(iface)))
|
|
ip = IPNetwork(network)
|
|
return str(ip.network)
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_ipv6_network_address(iface):
|
|
# Behave in same way as ipv4 get_network_address() above if iface is None.
|
|
if not iface:
|
|
return None
|
|
|
|
try:
|
|
ipv6_addr = utils.get_ipv6_addr(iface=iface)[0]
|
|
all_addrs = netifaces.ifaddresses(iface)
|
|
|
|
for addr in all_addrs[netifaces.AF_INET6]:
|
|
if ipv6_addr == addr['addr']:
|
|
network = "{}/{}".format(addr['addr'], addr['netmask'])
|
|
return str(IPNetwork(network).network)
|
|
|
|
except ValueError:
|
|
msg = "Invalid interface '%s'" % iface
|
|
status_set('blocked', msg)
|
|
raise Exception(msg)
|
|
|
|
msg = "No valid network found for interface '%s'" % iface
|
|
status_set('blocked', msg)
|
|
raise Exception(msg)
|
|
|
|
|
|
def get_corosync_id(unit_name):
|
|
# Corosync nodeid 0 is reserved so increase all the nodeids to avoid it
|
|
off_set = 1000
|
|
return off_set + int(unit_name.split('/')[1])
|
|
|
|
|
|
def nulls(data):
|
|
"""Returns keys of values that are null (but not bool)"""
|
|
return [k for k in data.iterkeys()
|
|
if not isinstance(data[k], bool) and not data[k]]
|
|
|
|
|
|
def get_corosync_conf():
|
|
if config('prefer-ipv6'):
|
|
ip_version = 'ipv6'
|
|
bindnetaddr = get_ipv6_network_address
|
|
else:
|
|
ip_version = 'ipv4'
|
|
bindnetaddr = get_network_address
|
|
|
|
# NOTE(jamespage) use local charm configuration over any provided by
|
|
# principle charm
|
|
conf = {
|
|
'corosync_bindnetaddr':
|
|
bindnetaddr(config('corosync_bindiface')),
|
|
'corosync_mcastport': config('corosync_mcastport'),
|
|
'corosync_mcastaddr': config('corosync_mcastaddr'),
|
|
'ip_version': ip_version,
|
|
'ha_nodes': get_ha_nodes(),
|
|
'transport': get_transport(),
|
|
}
|
|
|
|
if config('prefer-ipv6'):
|
|
conf['nodeid'] = get_corosync_id(local_unit())
|
|
|
|
if config('netmtu'):
|
|
conf['netmtu'] = config('netmtu')
|
|
|
|
if config('debug'):
|
|
conf['debug'] = config('debug')
|
|
|
|
if not nulls(conf):
|
|
log("Found sufficient values in local config to populate "
|
|
"corosync.conf", level=DEBUG)
|
|
return conf
|
|
|
|
conf = {}
|
|
for relid in relation_ids('ha'):
|
|
for unit in related_units(relid):
|
|
bindiface = relation_get('corosync_bindiface',
|
|
unit, relid)
|
|
conf = {
|
|
'corosync_bindnetaddr': bindnetaddr(bindiface),
|
|
'corosync_mcastport': relation_get('corosync_mcastport',
|
|
unit, relid),
|
|
'corosync_mcastaddr': config('corosync_mcastaddr'),
|
|
'ip_version': ip_version,
|
|
'ha_nodes': get_ha_nodes(),
|
|
'transport': get_transport(),
|
|
}
|
|
|
|
if config('prefer-ipv6'):
|
|
conf['nodeid'] = get_corosync_id(local_unit())
|
|
|
|
if config('netmtu'):
|
|
conf['netmtu'] = config('netmtu')
|
|
|
|
if config('debug'):
|
|
conf['debug'] = config('debug')
|
|
|
|
# Values up to this point must be non-null
|
|
if nulls(conf):
|
|
continue
|
|
|
|
return conf
|
|
|
|
missing = [k for k, v in conf.iteritems() if v is None]
|
|
log('Missing required configuration: %s' % missing)
|
|
return None
|
|
|
|
|
|
def emit_corosync_conf():
|
|
corosync_conf_context = get_corosync_conf()
|
|
if corosync_conf_context:
|
|
write_file(path=COROSYNC_CONF,
|
|
content=render_template('corosync.conf',
|
|
corosync_conf_context))
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def emit_base_conf():
|
|
corosync_default_context = {'corosync_enabled': 'yes'}
|
|
write_file(path=COROSYNC_DEFAULT,
|
|
content=render_template('corosync',
|
|
corosync_default_context))
|
|
|
|
write_file(path=COROSYNC_HACLUSTER_ACL,
|
|
content=render_template('hacluster.acl', {}))
|
|
|
|
corosync_key = config('corosync_key')
|
|
if corosync_key:
|
|
write_file(path=COROSYNC_AUTHKEY,
|
|
content=b64decode(corosync_key),
|
|
perms=0o400)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def render_template(template_name, context, template_dir=TEMPLATES_DIR):
|
|
templates = jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader(template_dir)
|
|
)
|
|
template = templates.get_template(template_name)
|
|
return template.render(context)
|
|
|
|
|
|
def assert_charm_supports_ipv6():
|
|
"""Check whether we are able to support charms ipv6."""
|
|
if lsb_release()['DISTRIB_CODENAME'].lower() < "trusty":
|
|
msg = "IPv6 is not supported in the charms for Ubuntu " \
|
|
"versions less than Trusty 14.04"
|
|
status_set('blocked', msg)
|
|
raise Exception(msg)
|
|
|
|
|
|
def get_transport():
|
|
transport = config('corosync_transport')
|
|
_deprecated_transport_values = {"multicast": "udp", "unicast": "udpu"}
|
|
val = _deprecated_transport_values.get(transport, transport)
|
|
if val not in ['udp', 'udpu']:
|
|
msg = ("Unsupported corosync_transport type '%s' - supported "
|
|
"types are: %s" % (transport, ', '.join(SUPPORTED_TRANSPORTS)))
|
|
status_set('blocked', msg)
|
|
raise ValueError(msg)
|
|
|
|
return val
|
|
|
|
|
|
def get_ipv6_addr():
|
|
"""Exclude any ip addresses configured or managed by corosync."""
|
|
excludes = []
|
|
for rid in relation_ids('ha'):
|
|
for unit in related_units(rid):
|
|
resources = parse_data(rid, unit, 'resources')
|
|
for res in resources.itervalues():
|
|
if 'ocf:heartbeat:IPv6addr' in res:
|
|
res_params = parse_data(rid, unit, 'resource_params')
|
|
res_p = res_params.get(res)
|
|
if res_p:
|
|
for k, v in res_p.itervalues():
|
|
if utils.is_ipv6(v):
|
|
log("Excluding '%s' from address list" % v,
|
|
level=DEBUG)
|
|
excludes.append(v)
|
|
|
|
return utils.get_ipv6_addr(exc_list=excludes)[0]
|
|
|
|
|
|
def get_ha_nodes():
|
|
ha_units = peer_ips(peer_relation='hanode')
|
|
ha_nodes = {}
|
|
for unit in ha_units:
|
|
corosync_id = get_corosync_id(unit)
|
|
addr = ha_units[unit]
|
|
if config('prefer-ipv6'):
|
|
if not utils.is_ipv6(addr):
|
|
# Not an error since cluster may still be forming/updating
|
|
log("Expected an ipv6 address but got %s" % (addr),
|
|
level=WARNING)
|
|
|
|
ha_nodes[corosync_id] = addr
|
|
else:
|
|
ha_nodes[corosync_id] = get_host_ip(addr)
|
|
|
|
corosync_id = get_corosync_id(local_unit())
|
|
if config('prefer-ipv6'):
|
|
addr = get_ipv6_addr()
|
|
else:
|
|
addr = get_host_ip(unit_get('private-address'))
|
|
|
|
ha_nodes[corosync_id] = addr
|
|
|
|
return ha_nodes
|
|
|
|
|
|
def get_cluster_nodes():
|
|
hosts = []
|
|
if config('prefer-ipv6'):
|
|
hosts.append(get_ipv6_addr())
|
|
else:
|
|
hosts.append(unit_get('private-address'))
|
|
|
|
for relid in relation_ids('hanode'):
|
|
for unit in related_units(relid):
|
|
if relation_get('ready', rid=relid, unit=unit):
|
|
hosts.append(relation_get('private-address', unit, relid))
|
|
|
|
hosts.sort()
|
|
return hosts
|
|
|
|
|
|
def parse_data(relid, unit, key):
|
|
"""Simple helper to ast parse relation data"""
|
|
data = relation_get(key, unit, relid)
|
|
if data:
|
|
return ast.literal_eval(data)
|
|
|
|
return {}
|
|
|
|
|
|
def configure_stonith():
|
|
if config('stonith_enabled') not in ['true', 'True', True]:
|
|
log('Disabling STONITH', level=INFO)
|
|
cmd = "crm configure property stonith-enabled=false"
|
|
pcmk.commit(cmd)
|
|
else:
|
|
log('Enabling STONITH for all nodes in cluster.', level=INFO)
|
|
# configure stontih resources for all nodes in cluster.
|
|
# note: this is totally provider dependent and requires
|
|
# access to the MAAS API endpoint, using endpoint and credentials
|
|
# set in config.
|
|
url = config('maas_url')
|
|
creds = config('maas_credentials')
|
|
if None in [url, creds]:
|
|
msg = 'maas_url and maas_credentials must be set ' \
|
|
'in config to enable STONITH.'
|
|
status_set('blocked', msg)
|
|
raise Exception(msg)
|
|
|
|
nodes = maas.MAASHelper(url, creds).list_nodes()
|
|
if not nodes:
|
|
msg = 'Could not obtain node inventory from ' \
|
|
'MAAS @ %s.' % url
|
|
status_set('blocked', msg)
|
|
raise Exception(msg)
|
|
|
|
cluster_nodes = pcmk.list_nodes()
|
|
for node in cluster_nodes:
|
|
rsc, constraint = pcmk.maas_stonith_primitive(nodes, node)
|
|
if not rsc:
|
|
msg = 'Failed to determine STONITH primitive for ' \
|
|
'node %s' % node
|
|
status_set('blocked', msg)
|
|
raise Exception(msg)
|
|
|
|
rsc_name = str(rsc).split(' ')[1]
|
|
if not pcmk.is_resource_present(rsc_name):
|
|
log('Creating new STONITH primitive %s.' % rsc_name,
|
|
level=DEBUG)
|
|
cmd = 'crm -F configure %s' % rsc
|
|
pcmk.commit(cmd)
|
|
if constraint:
|
|
cmd = 'crm -F configure %s' % constraint
|
|
pcmk.commit(cmd)
|
|
else:
|
|
log('STONITH primitive already exists for node.', level=DEBUG)
|
|
|
|
pcmk.commit("crm configure property stonith-enabled=true")
|
|
|
|
|
|
def configure_monitor_host():
|
|
"""Configure extra monitor host for better network failure detection"""
|
|
log('Checking monitor host configuration', level=DEBUG)
|
|
monitor_host = config('monitor_host')
|
|
if monitor_host:
|
|
if not pcmk.crm_opt_exists('ping'):
|
|
log('Implementing monitor host configuration (host: %s)' %
|
|
monitor_host, level=DEBUG)
|
|
monitor_interval = config('monitor_interval')
|
|
cmd = ('crm -w -F configure primitive ping '
|
|
'ocf:pacemaker:ping params host_list="%s" '
|
|
'multiplier="100" op monitor interval="%s" ' %
|
|
(monitor_host, monitor_interval))
|
|
pcmk.commit(cmd)
|
|
cmd = ('crm -w -F configure clone cl_ping ping '
|
|
'meta interleave="true"')
|
|
pcmk.commit(cmd)
|
|
else:
|
|
log('Reconfiguring monitor host configuration (host: %s)' %
|
|
monitor_host, level=DEBUG)
|
|
cmd = ('crm -w -F resource param ping set host_list="%s"' %
|
|
monitor_host)
|
|
else:
|
|
if pcmk.crm_opt_exists('ping'):
|
|
log('Disabling monitor host configuration', level=DEBUG)
|
|
pcmk.commit('crm -w -F resource stop ping')
|
|
pcmk.commit('crm -w -F configure delete ping')
|
|
|
|
|
|
def configure_cluster_global():
|
|
"""Configure global cluster options"""
|
|
log('Applying global cluster configuration', level=DEBUG)
|
|
if int(config('cluster_count')) >= 3:
|
|
# NOTE(jamespage) if 3 or more nodes, then quorum can be
|
|
# managed effectively, so stop if quorum lost
|
|
log('Configuring no-quorum-policy to stop', level=DEBUG)
|
|
cmd = "crm configure property no-quorum-policy=stop"
|
|
else:
|
|
# NOTE(jamespage) if less that 3 nodes, quorum not possible
|
|
# so ignore
|
|
log('Configuring no-quorum-policy to ignore', level=DEBUG)
|
|
cmd = "crm configure property no-quorum-policy=ignore"
|
|
|
|
pcmk.commit(cmd)
|
|
cmd = ('crm configure rsc_defaults $id="rsc-options" '
|
|
'resource-stickiness="100"')
|
|
pcmk.commit(cmd)
|
|
|
|
|
|
def restart_corosync_on_change():
|
|
"""Simple decorator to restart corosync if any of its config changes"""
|
|
def wrap(f):
|
|
def wrapped_f(*args, **kwargs):
|
|
checksums = {}
|
|
for path in COROSYNC_CONF_FILES:
|
|
checksums[path] = file_hash(path)
|
|
return_data = f(*args, **kwargs)
|
|
# NOTE: this assumes that this call is always done around
|
|
# configure_corosync, which returns true if configuration
|
|
# files where actually generated
|
|
if return_data:
|
|
for path in COROSYNC_CONF_FILES:
|
|
if checksums[path] != file_hash(path):
|
|
restart_corosync()
|
|
break
|
|
|
|
return return_data
|
|
return wrapped_f
|
|
return wrap
|
|
|
|
|
|
@restart_corosync_on_change()
|
|
def configure_corosync():
|
|
log('Configuring and (maybe) restarting corosync', level=DEBUG)
|
|
return emit_base_conf() and emit_corosync_conf()
|
|
|
|
|
|
def restart_corosync():
|
|
if service_running("pacemaker"):
|
|
service_stop("pacemaker")
|
|
|
|
if not is_unit_paused_set():
|
|
service_restart("corosync")
|
|
service_start("pacemaker")
|
|
|
|
|
|
def is_in_standby_mode(node_name):
|
|
"""Check if node is in standby mode in pacemaker
|
|
|
|
@param node_name: The name of the node to check
|
|
@returns boolean - True if node_name is in standby mode
|
|
"""
|
|
out = subprocess.check_output(['crm', 'node', 'status', node_name])
|
|
root = ET.fromstring(out)
|
|
|
|
standby_mode = False
|
|
for nvpair in root.iter('nvpair'):
|
|
if (nvpair.attrib.get('name') == 'standby' and
|
|
nvpair.attrib.get('value') == 'on'):
|
|
standby_mode = True
|
|
return standby_mode
|
|
|
|
|
|
def get_hostname():
|
|
"""Return the hostname of this unit
|
|
|
|
@returns hostname
|
|
"""
|
|
return subprocess.check_output(['uname', '-n']).rstrip()
|
|
|
|
|
|
def enter_standby_mode(node_name, duration='forever'):
|
|
"""Put this node into standby mode in pacemaker
|
|
|
|
@returns None
|
|
"""
|
|
subprocess.check_call(['crm', 'node', 'standby', node_name, duration])
|
|
|
|
|
|
def leave_standby_mode(node_name):
|
|
"""Take this node out of standby mode in pacemaker
|
|
|
|
@returns None
|
|
"""
|
|
subprocess.check_call(['crm', 'node', 'online', node_name])
|
|
|
|
|
|
def node_has_resources(node_name):
|
|
"""Check if this node is running resources
|
|
|
|
@param node_name: The name of the node to check
|
|
@returns boolean - True if node_name has resources
|
|
"""
|
|
out = subprocess.check_output(['crm_mon', '-X'])
|
|
root = ET.fromstring(out)
|
|
has_resources = False
|
|
for resource in root.iter('resource'):
|
|
for child in resource:
|
|
if child.tag == 'node' and child.attrib.get('name') == node_name:
|
|
has_resources = True
|
|
return has_resources
|
|
|
|
|
|
def set_unit_status():
|
|
"""Set the workload status for this unit
|
|
|
|
@returns None
|
|
"""
|
|
status, messages = assess_status_helper()
|
|
status_set(status, messages)
|
|
|
|
|
|
def resume_unit():
|
|
"""Resume services on this unit and update the units status
|
|
|
|
@returns None
|
|
"""
|
|
node_name = get_hostname()
|
|
messages = []
|
|
leave_standby_mode(node_name)
|
|
if is_in_standby_mode(node_name):
|
|
messages.append("Node still in standby mode")
|
|
if messages:
|
|
raise Exception("Couldn't resume: {}".format("; ".join(messages)))
|
|
else:
|
|
clear_unit_paused()
|
|
set_unit_status()
|
|
|
|
|
|
def pause_unit():
|
|
"""Pause services on this unit and update the units status
|
|
|
|
@returns None
|
|
"""
|
|
node_name = get_hostname()
|
|
messages = []
|
|
enter_standby_mode(node_name)
|
|
if not is_in_standby_mode(node_name):
|
|
messages.append("Node not in standby mode")
|
|
if node_has_resources(node_name):
|
|
messages.append("Resources still running on unit")
|
|
status, message = assess_status_helper()
|
|
if status != 'active':
|
|
messages.append(message)
|
|
if messages:
|
|
raise Exception("Couldn't pause: {}".format("; ".join(messages)))
|
|
else:
|
|
set_unit_paused()
|
|
status_set("maintenance",
|
|
"Paused. Use 'resume' action to resume normal service.")
|
|
|
|
|
|
def assess_status_helper():
|
|
"""Assess status of unit
|
|
|
|
@returns status, message - status is workload status and message is any
|
|
corresponding messages
|
|
"""
|
|
node_count = int(config('cluster_count'))
|
|
status = 'active'
|
|
message = 'Unit is ready and clustered'
|
|
for relid in relation_ids('hanode'):
|
|
if len(related_units(relid)) + 1 < node_count:
|
|
status = 'blocked'
|
|
message = ("Insufficient peer units for ha cluster "
|
|
"(require {})".format(node_count))
|
|
return status, message
|