Make worker-multiplier sane in container environments
Resync charm-helpers to pickup the capped worker-multiplier changes when deploying in containers. Drop the default value for worker-multiplier of 2.0; this is now handled from within the codebase rather than via a default configuration value, reflecting the differing behaviours between container and non-container deployments. Fixup amulet tests to use common helper for action execution. Change-Id: I7cac84fde5c43d1827f753e198b3f4a8e1e4151b Closes-Bug: 1665270
This commit is contained in:
parent
ea45357fe6
commit
106686b9cd
@ -308,11 +308,13 @@ options:
|
||||
your network interface.
|
||||
worker-multiplier:
|
||||
type: float
|
||||
default: 2.0
|
||||
default:
|
||||
description: |
|
||||
The CPU core multiplier to use when configuring worker processes for
|
||||
Cinder. By default, the number of workers for each daemon is set to
|
||||
twice the number of CPU cores a service unit has.
|
||||
twice the number of CPU cores a service unit has. When deployed in
|
||||
a LXD container, this default value will be capped to 4 workers
|
||||
unless this configuration option is set.
|
||||
nagios_context:
|
||||
default: "juju"
|
||||
type: string
|
||||
|
@ -373,7 +373,7 @@ def add_init_service_checks(nrpe, services, unit_name, immediate_check=True):
|
||||
checkpath = '%s/service-check-%s.txt' % (nrpe.homedir, svc)
|
||||
croncmd = (
|
||||
'/usr/local/lib/nagios/plugins/check_exit_status.pl '
|
||||
'-s /etc/init.d/%s status' % svc
|
||||
'-e -s /etc/init.d/%s status' % svc
|
||||
)
|
||||
cron_file = '*/5 * * * * root %s > %s\n' % (croncmd, checkpath)
|
||||
f = open(cronpath, 'w')
|
||||
|
@ -111,9 +111,9 @@ def get_address_in_network(network, fallback=None, fatal=False):
|
||||
for iface in netifaces.interfaces():
|
||||
addresses = netifaces.ifaddresses(iface)
|
||||
if network.version == 4 and netifaces.AF_INET in addresses:
|
||||
addr = addresses[netifaces.AF_INET][0]['addr']
|
||||
netmask = addresses[netifaces.AF_INET][0]['netmask']
|
||||
cidr = netaddr.IPNetwork("%s/%s" % (addr, netmask))
|
||||
for addr in addresses[netifaces.AF_INET]:
|
||||
cidr = netaddr.IPNetwork("%s/%s" % (addr['addr'],
|
||||
addr['netmask']))
|
||||
if cidr in network:
|
||||
return str(cidr.ip)
|
||||
|
||||
@ -239,6 +239,16 @@ def format_ipv6_addr(address):
|
||||
return None
|
||||
|
||||
|
||||
def is_ipv6_disabled():
|
||||
try:
|
||||
result = subprocess.check_output(
|
||||
['sysctl', 'net.ipv6.conf.all.disable_ipv6'],
|
||||
stderr=subprocess.STDOUT)
|
||||
return "net.ipv6.conf.all.disable_ipv6 = 1" in result
|
||||
except subprocess.CalledProcessError:
|
||||
return True
|
||||
|
||||
|
||||
def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False,
|
||||
fatal=True, exc_list=None):
|
||||
"""Return the assigned IP address for a given interface, if any.
|
||||
@ -544,31 +554,38 @@ def assert_charm_supports_ipv6():
|
||||
"versions less than Trusty 14.04")
|
||||
|
||||
|
||||
def get_relation_ip(interface, config_override=None):
|
||||
"""Return this unit's IP for the given relation.
|
||||
def get_relation_ip(interface, cidr_network=None):
|
||||
"""Return this unit's IP for the given interface.
|
||||
|
||||
Allow for an arbitrary interface to use with network-get to select an IP.
|
||||
Handle all address selection options including configuration parameter
|
||||
override and IPv6.
|
||||
Handle all address selection options including passed cidr network and
|
||||
IPv6.
|
||||
|
||||
Usage: get_relation_ip('amqp', config_override='access-network')
|
||||
Usage: get_relation_ip('amqp', cidr_network='10.0.0.0/8')
|
||||
|
||||
@param interface: string name of the relation.
|
||||
@param config_override: string name of the config option for network
|
||||
override. Supports legacy network override configuration parameters.
|
||||
@param cidr_network: string CIDR Network to select an address from.
|
||||
@raises Exception if prefer-ipv6 is configured but IPv6 unsupported.
|
||||
@returns IPv6 or IPv4 address
|
||||
"""
|
||||
# Select the interface address first
|
||||
# For possible use as a fallback bellow with get_address_in_network
|
||||
try:
|
||||
# Get the interface specific IP
|
||||
address = network_get_primary_address(interface)
|
||||
except NotImplementedError:
|
||||
# If network-get is not available
|
||||
address = get_host_ip(unit_get('private-address'))
|
||||
|
||||
fallback = get_host_ip(unit_get('private-address'))
|
||||
if config('prefer-ipv6'):
|
||||
# Currently IPv6 has priority, eventually we want IPv6 to just be
|
||||
# another network space.
|
||||
assert_charm_supports_ipv6()
|
||||
return get_ipv6_addr()[0]
|
||||
elif config_override and config(config_override):
|
||||
return get_address_in_network(config(config_override),
|
||||
fallback)
|
||||
else:
|
||||
try:
|
||||
return network_get_primary_address(interface)
|
||||
except NotImplementedError:
|
||||
return fallback
|
||||
elif cidr_network:
|
||||
# If a specific CIDR network is passed get the address from that
|
||||
# network.
|
||||
return get_address_in_network(cidr_network, address)
|
||||
|
||||
# Return the interface address
|
||||
return address
|
||||
|
@ -547,7 +547,7 @@ class OpenStackAmuletUtils(AmuletUtils):
|
||||
"""Create the specified instance."""
|
||||
self.log.debug('Creating instance '
|
||||
'({}|{}|{})'.format(instance_name, image_name, flavor))
|
||||
image = nova.images.find(name=image_name)
|
||||
image = nova.glance.find_image(image_name)
|
||||
flavor = nova.flavors.find(name=flavor)
|
||||
instance = nova.servers.create(name=instance_name, image=image,
|
||||
flavor=flavor)
|
||||
|
@ -60,6 +60,7 @@ from charmhelpers.core.host import (
|
||||
pwgen,
|
||||
lsb_release,
|
||||
CompareHostReleases,
|
||||
is_container,
|
||||
)
|
||||
from charmhelpers.contrib.hahelpers.cluster import (
|
||||
determine_apache_port,
|
||||
@ -88,6 +89,7 @@ from charmhelpers.contrib.network.ip import (
|
||||
format_ipv6_addr,
|
||||
is_address_in_network,
|
||||
is_bridge_member,
|
||||
is_ipv6_disabled,
|
||||
)
|
||||
from charmhelpers.contrib.openstack.utils import (
|
||||
config_flags_parser,
|
||||
@ -109,6 +111,7 @@ except ImportError:
|
||||
|
||||
CA_CERT_PATH = '/usr/local/share/ca-certificates/keystone_juju_ca_cert.crt'
|
||||
ADDRESS_TYPES = ['admin', 'internal', 'public']
|
||||
HAPROXY_RUN_DIR = '/var/run/haproxy/'
|
||||
|
||||
|
||||
def ensure_packages(packages):
|
||||
@ -534,6 +537,8 @@ class HAProxyContext(OSContextGenerator):
|
||||
"""Provides half a context for the haproxy template, which describes
|
||||
all peers to be included in the cluster. Each charm needs to include
|
||||
its own context generator that describes the port mapping.
|
||||
|
||||
:side effect: mkdir is called on HAPROXY_RUN_DIR
|
||||
"""
|
||||
interfaces = ['cluster']
|
||||
|
||||
@ -541,6 +546,8 @@ class HAProxyContext(OSContextGenerator):
|
||||
self.singlenode_mode = singlenode_mode
|
||||
|
||||
def __call__(self):
|
||||
if not os.path.isdir(HAPROXY_RUN_DIR):
|
||||
mkdir(path=HAPROXY_RUN_DIR)
|
||||
if not relation_ids('cluster') and not self.singlenode_mode:
|
||||
return {}
|
||||
|
||||
@ -1221,22 +1228,54 @@ class BindHostContext(OSContextGenerator):
|
||||
return {'bind_host': '0.0.0.0'}
|
||||
|
||||
|
||||
class WorkerConfigContext(OSContextGenerator):
|
||||
MAX_DEFAULT_WORKERS = 4
|
||||
DEFAULT_MULTIPLIER = 2
|
||||
|
||||
@property
|
||||
def num_cpus(self):
|
||||
# NOTE: use cpu_count if present (16.04 support)
|
||||
if hasattr(psutil, 'cpu_count'):
|
||||
return psutil.cpu_count()
|
||||
else:
|
||||
return psutil.NUM_CPUS
|
||||
|
||||
def __call__(self):
|
||||
multiplier = config('worker-multiplier') or 0
|
||||
count = int(self.num_cpus * multiplier)
|
||||
def _calculate_workers():
|
||||
'''
|
||||
Determine the number of worker processes based on the CPU
|
||||
count of the unit containing the application.
|
||||
|
||||
Workers will be limited to MAX_DEFAULT_WORKERS in
|
||||
container environments where no worker-multipler configuration
|
||||
option been set.
|
||||
|
||||
@returns int: number of worker processes to use
|
||||
'''
|
||||
multiplier = config('worker-multiplier') or DEFAULT_MULTIPLIER
|
||||
count = int(_num_cpus() * multiplier)
|
||||
if multiplier > 0 and count == 0:
|
||||
count = 1
|
||||
ctxt = {"workers": count}
|
||||
|
||||
if config('worker-multiplier') is None and is_container():
|
||||
# NOTE(jamespage): Limit unconfigured worker-multiplier
|
||||
# to MAX_DEFAULT_WORKERS to avoid insane
|
||||
# worker configuration in LXD containers
|
||||
# on large servers
|
||||
# Reference: https://pad.lv/1665270
|
||||
count = min(count, MAX_DEFAULT_WORKERS)
|
||||
|
||||
return count
|
||||
|
||||
|
||||
def _num_cpus():
|
||||
'''
|
||||
Compatibility wrapper for calculating the number of CPU's
|
||||
a unit has.
|
||||
|
||||
@returns: int: number of CPU cores detected
|
||||
'''
|
||||
try:
|
||||
return psutil.cpu_count()
|
||||
except AttributeError:
|
||||
return psutil.NUM_CPUS
|
||||
|
||||
|
||||
class WorkerConfigContext(OSContextGenerator):
|
||||
|
||||
def __call__(self):
|
||||
ctxt = {"workers": _calculate_workers()}
|
||||
return ctxt
|
||||
|
||||
|
||||
@ -1244,7 +1283,7 @@ class WSGIWorkerConfigContext(WorkerConfigContext):
|
||||
|
||||
def __init__(self, name=None, script=None, admin_script=None,
|
||||
public_script=None, process_weight=1.00,
|
||||
admin_process_weight=0.75, public_process_weight=0.25):
|
||||
admin_process_weight=0.25, public_process_weight=0.75):
|
||||
self.service_name = name
|
||||
self.user = name
|
||||
self.group = name
|
||||
@ -1256,8 +1295,7 @@ class WSGIWorkerConfigContext(WorkerConfigContext):
|
||||
self.public_process_weight = public_process_weight
|
||||
|
||||
def __call__(self):
|
||||
multiplier = config('worker-multiplier') or 1
|
||||
total_processes = self.num_cpus * multiplier
|
||||
total_processes = _calculate_workers()
|
||||
ctxt = {
|
||||
"service_name": self.service_name,
|
||||
"user": self.user,
|
||||
@ -1588,7 +1626,7 @@ class MemcacheContext(OSContextGenerator):
|
||||
"""Memcache context
|
||||
|
||||
This context provides options for configuring a local memcache client and
|
||||
server
|
||||
server for both IPv4 and IPv6
|
||||
"""
|
||||
|
||||
def __init__(self, package=None):
|
||||
@ -1606,6 +1644,17 @@ class MemcacheContext(OSContextGenerator):
|
||||
# Trusty version of memcached does not support ::1 as a listen
|
||||
# address so use host file entry instead
|
||||
release = lsb_release()['DISTRIB_CODENAME'].lower()
|
||||
if is_ipv6_disabled():
|
||||
if CompareHostReleases(release) > 'trusty':
|
||||
ctxt['memcache_server'] = '127.0.0.1'
|
||||
else:
|
||||
ctxt['memcache_server'] = 'localhost'
|
||||
ctxt['memcache_server_formatted'] = '127.0.0.1'
|
||||
ctxt['memcache_port'] = '11211'
|
||||
ctxt['memcache_url'] = '{}:{}'.format(
|
||||
ctxt['memcache_server_formatted'],
|
||||
ctxt['memcache_port'])
|
||||
else:
|
||||
if CompareHostReleases(release) > 'trusty':
|
||||
ctxt['memcache_server'] = '::1'
|
||||
else:
|
||||
|
@ -5,6 +5,8 @@ global
|
||||
user haproxy
|
||||
group haproxy
|
||||
spread-checks 0
|
||||
stats socket /var/run/haproxy/admin.sock mode 600 level admin
|
||||
stats timeout 2m
|
||||
|
||||
defaults
|
||||
log global
|
||||
@ -58,6 +60,15 @@ frontend tcp-in_{{ service }}
|
||||
{% for frontend in frontends -%}
|
||||
backend {{ service }}_{{ frontend }}
|
||||
balance leastconn
|
||||
{% if backend_options -%}
|
||||
{% if backend_options[service] -%}
|
||||
{% for option in backend_options[service] -%}
|
||||
{% for key, value in option.items() -%}
|
||||
{{ key }} {{ value }}
|
||||
{% endfor -%}
|
||||
{% endfor -%}
|
||||
{% endif -%}
|
||||
{% endif -%}
|
||||
{% for unit, address in frontends[frontend]['backends'].items() -%}
|
||||
server {{ unit }} {{ address }}:{{ ports[1] }} check
|
||||
{% endfor %}
|
||||
|
@ -987,13 +987,15 @@ def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
|
||||
service_start(svc)
|
||||
|
||||
|
||||
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
|
||||
def ensure_ceph_keyring(service, user=None, group=None,
|
||||
relation='ceph', key=None):
|
||||
"""Ensures a ceph keyring is created for a named service and optionally
|
||||
ensures user and group ownership.
|
||||
|
||||
Returns False if no ceph key is available in relation state.
|
||||
@returns boolean: Flag to indicate whether a key was successfully written
|
||||
to disk based on either relation data or a supplied key
|
||||
"""
|
||||
key = None
|
||||
if not key:
|
||||
for rid in relation_ids(relation):
|
||||
for unit in related_units(rid):
|
||||
key = relation_get('key', rid=rid, unit=unit)
|
||||
|
@ -191,7 +191,7 @@ def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d",
|
||||
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
|
||||
sysv_file = os.path.join(initd_dir, service_name)
|
||||
if init_is_systemd():
|
||||
service('disable', service_name)
|
||||
service('mask', service_name)
|
||||
elif os.path.exists(upstart_file):
|
||||
override_path = os.path.join(
|
||||
init_dir, '{}.override'.format(service_name))
|
||||
@ -224,7 +224,7 @@ def service_resume(service_name, init_dir="/etc/init",
|
||||
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
|
||||
sysv_file = os.path.join(initd_dir, service_name)
|
||||
if init_is_systemd():
|
||||
service('enable', service_name)
|
||||
service('unmask', service_name)
|
||||
elif os.path.exists(upstart_file):
|
||||
override_path = os.path.join(
|
||||
init_dir, '{}.override'.format(service_name))
|
||||
|
@ -17,9 +17,6 @@
|
||||
import amulet
|
||||
import os
|
||||
import yaml
|
||||
import time
|
||||
import json
|
||||
import subprocess
|
||||
|
||||
from charmhelpers.contrib.openstack.amulet.deployment import (
|
||||
OpenStackAmuletDeployment
|
||||
@ -170,32 +167,6 @@ class CinderBasicDeployment(OpenStackAmuletDeployment):
|
||||
# Authenticate admin with glance endpoint
|
||||
self.glance = u.authenticate_glance_admin(self.keystone)
|
||||
|
||||
def _run_action(self, unit_id, action, *args):
|
||||
command = ["juju", "action", "do", "--format=json", unit_id, action]
|
||||
command.extend(args)
|
||||
print("Running command: %s\n" % " ".join(command))
|
||||
output = subprocess.check_output(command)
|
||||
output_json = output.decode(encoding="UTF-8")
|
||||
data = json.loads(output_json)
|
||||
action_id = data[u'Action queued with id']
|
||||
return action_id
|
||||
|
||||
def _wait_on_action(self, action_id):
|
||||
command = ["juju", "action", "fetch", "--format=json", action_id]
|
||||
while True:
|
||||
try:
|
||||
output = subprocess.check_output(command)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
output_json = output.decode(encoding="UTF-8")
|
||||
data = json.loads(output_json)
|
||||
if data[u"status"] == "completed":
|
||||
return True
|
||||
elif data[u"status"] == "failed":
|
||||
return False
|
||||
time.sleep(2)
|
||||
|
||||
def _extend_cinder_volume(self, vol_id, new_size=2):
|
||||
"""Extend an existing cinder volume size.
|
||||
|
||||
@ -813,16 +784,16 @@ class CinderBasicDeployment(OpenStackAmuletDeployment):
|
||||
assert u.status_get(unit)[0] == "active"
|
||||
|
||||
u.log.debug('Running pause action on {}'.format(unit_name))
|
||||
action_id = self._run_action(unit_name, "pause")
|
||||
action_id = u.run_action(unit, "pause")
|
||||
u.log.debug('Waiting on action {}'.format(action_id))
|
||||
assert self._wait_on_action(action_id), "Pause action failed."
|
||||
assert u.wait_on_action(action_id), "Pause action failed."
|
||||
u.log.debug('Checking for maintenance status on {}'.format(unit_name))
|
||||
assert u.status_get(unit)[0] == "maintenance"
|
||||
|
||||
u.log.debug('Running resume action on {}'.format(unit_name))
|
||||
action_id = self._run_action(unit_name, "resume")
|
||||
action_id = u.run_action(unit, "resume")
|
||||
u.log.debug('Waiting on action {}'.format(action_id))
|
||||
assert self._wait_on_action(action_id), "Resume action failed."
|
||||
assert u.wait_on_action(action_id), "Resume action failed."
|
||||
u.log.debug('Checking for active status on {}'.format(unit_name))
|
||||
assert u.status_get(unit)[0] == "active"
|
||||
u.log.debug('OK')
|
||||
|
@ -547,7 +547,7 @@ class OpenStackAmuletUtils(AmuletUtils):
|
||||
"""Create the specified instance."""
|
||||
self.log.debug('Creating instance '
|
||||
'({}|{}|{})'.format(instance_name, image_name, flavor))
|
||||
image = nova.images.find(name=image_name)
|
||||
image = nova.glance.find_image(image_name)
|
||||
flavor = nova.flavors.find(name=flavor)
|
||||
instance = nova.servers.create(name=instance_name, image=image,
|
||||
flavor=flavor)
|
||||
|
@ -191,7 +191,7 @@ def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d",
|
||||
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
|
||||
sysv_file = os.path.join(initd_dir, service_name)
|
||||
if init_is_systemd():
|
||||
service('disable', service_name)
|
||||
service('mask', service_name)
|
||||
elif os.path.exists(upstart_file):
|
||||
override_path = os.path.join(
|
||||
init_dir, '{}.override'.format(service_name))
|
||||
@ -224,7 +224,7 @@ def service_resume(service_name, init_dir="/etc/init",
|
||||
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
|
||||
sysv_file = os.path.join(initd_dir, service_name)
|
||||
if init_is_systemd():
|
||||
service('enable', service_name)
|
||||
service('unmask', service_name)
|
||||
elif os.path.exists(upstart_file):
|
||||
override_path = os.path.join(
|
||||
init_dir, '{}.override'.format(service_name))
|
||||
|
Loading…
Reference in New Issue
Block a user