Amphora vertical scaling optimization in Octavia

Uses the amphora agent for optimizing HAProxy for vertical scaling:

Set cpu_map setting so that HAProxy pins each of its worker threads to
one specific CPU (except CPU0, which remains reserved for other tasks).
In order not
to change previous behavior this feature will only be enabled if the
amphora image was built with vCPU vertical scaling enabled (the new
default).
This change assumes an amphora image with HAProxy version 1.5 or higher.

Story: 2010236
Task: 46043
Change-Id: Ifbbe714c66117e57f96614534e6f20a9634c26eb
This commit is contained in:
Tom Weininger 2022-07-08 18:01:10 +02:00
parent c1ebe18e2a
commit 0b1b6c58f5
8 changed files with 158 additions and 47 deletions

View File

@ -17,6 +17,7 @@ import re
import socket
import subprocess
from oslo_log import log as logging
import pyroute2
import webob
@ -26,6 +27,8 @@ from octavia.amphorae.backends.utils import network_utils
from octavia.common import constants as consts
from octavia.common import exceptions
LOG = logging.getLogger(__name__)
class AmphoraInfo(object):
def __init__(self, osutils):
@ -60,6 +63,9 @@ class AmphoraInfo(object):
meminfo = self._get_meminfo()
cpu = self._cpu()
st = os.statvfs('/')
listeners = (
sorted(set(haproxy_listener_list + lvs_listener_list))
if lvs_listener_list else haproxy_listener_list)
body = {'hostname': socket.gethostname(),
'haproxy_version':
self._get_version_of_installed_package('haproxy'),
@ -68,6 +74,7 @@ class AmphoraInfo(object):
'active': True,
'haproxy_count':
self._count_haproxy_processes(haproxy_listener_list),
'cpu_count': os.cpu_count(),
'cpu': {
'total': cpu['total'],
'user': cpu['user'],
@ -85,11 +92,10 @@ class AmphoraInfo(object):
'used': (st.f_blocks - st.f_bfree) * st.f_frsize,
'available': st.f_bavail * st.f_frsize},
'load': self._load(),
'active_tuned_profiles': self._get_active_tuned_profiles(),
'topology': consts.TOPOLOGY_SINGLE,
'topology_status': consts.TOPOLOGY_STATUS_OK,
'listeners': sorted(list(
set(haproxy_listener_list + lvs_listener_list)))
if lvs_listener_list else haproxy_listener_list,
'listeners': listeners,
'packages': {}}
if extend_body:
body.update(extend_body)
@ -188,3 +194,12 @@ class AmphoraInfo(object):
status=404)
return webob.Response(json=dict(message='OK', interface=interface),
status=200)
def _get_active_tuned_profiles(self) -> str:
"""Returns the active TuneD profile(s)"""
try:
with open("/etc/tuned/active_profile", "r", encoding="utf-8") as f:
return f.read(1024).strip()
except OSError as ex:
LOG.debug("Reading active TuneD profiles failed: %r", ex)
return ""

View File

@ -219,10 +219,13 @@ class HaproxyAmphoraLoadBalancerDriver(
if has_tcp and not split_config:
if listeners_to_update:
# Generate HaProxy configuration from listener object
amp_details = self.clients[amphora.api_version].get_details(
amphora)
config = self.jinja_combo.build_config(
host_amphora=amphora, listeners=listeners_to_update,
tls_certs=certs,
haproxy_versions=haproxy_versions)
haproxy_versions=haproxy_versions,
amp_details=amp_details)
self.clients[amphora.api_version].upload_config(
amphora, loadbalancer.id, config,
timeout_dict=timeout_dict)

View File

@ -14,6 +14,7 @@
import os
import re
from typing import Optional
import jinja2
from octavia_lib.common import constants as lib_consts
@ -85,11 +86,12 @@ class JinjaTemplater(object):
self.connection_logging = connection_logging
def build_config(self, host_amphora, listeners, tls_certs,
haproxy_versions, socket_path=None):
haproxy_versions, amp_details, socket_path=None):
"""Convert a logical configuration to the HAProxy version
:param host_amphora: The Amphora this configuration is hosted on
:param listener: The listener configuration
:param amp_details: Detail information from the amphora
:param socket_path: The socket path for Haproxy process
:return: Rendered configuration
"""
@ -115,7 +117,8 @@ class JinjaTemplater(object):
return self.render_loadbalancer_obj(
host_amphora, listeners, tls_certs=tls_certs,
socket_path=socket_path,
feature_compatibility=feature_compatibility)
feature_compatibility=feature_compatibility,
amp_details=amp_details)
def _get_template(self):
"""Returns the specified Jinja configuration template."""
@ -152,13 +155,15 @@ class JinjaTemplater(object):
def render_loadbalancer_obj(self, host_amphora, listeners,
tls_certs=None, socket_path=None,
feature_compatibility=None):
feature_compatibility=None,
amp_details: Optional[dict] = None):
"""Renders a templated configuration from a load balancer object
:param host_amphora: The Amphora this configuration is hosted on
:param listener: The listener configuration
:param tls_certs: Dict of the TLS certificates for the listener
:param socket_path: The socket path for Haproxy process
:param amp_details: Detail information from the amphora
:return: Rendered configuration
"""
feature_compatibility = feature_compatibility or {}
@ -167,7 +172,7 @@ class JinjaTemplater(object):
listeners[0].load_balancer,
listeners,
tls_certs,
feature_compatibility,)
feature_compatibility)
if not socket_path:
socket_path = '%s/%s.sock' % (self.base_amp_path,
listeners[0].load_balancer.id)
@ -175,28 +180,37 @@ class JinjaTemplater(object):
self.base_amp_path,
listeners[0].load_balancer.id) if feature_compatibility.get(
constants.SERVER_STATE_FILE) else ''
prometheus_listener = False
for listener in listeners:
if listener.protocol == lib_consts.PROTOCOL_PROMETHEUS:
prometheus_listener = True
break
prometheus_listener = any(
lsnr.protocol == lib_consts.PROTOCOL_PROMETHEUS for lsnr in
listeners)
require_insecure_fork = feature_compatibility.get(
constants.INSECURE_FORK)
enable_prometheus = prometheus_listener and feature_compatibility.get(
lib_consts.PROTOCOL_PROMETHEUS, False)
return self._get_template().render(
{'loadbalancer': loadbalancer,
jinja_dict = {
'loadbalancer': loadbalancer,
'stats_sock': socket_path,
'log_http': self.log_http,
'log_server': self.log_server,
'state_file': state_file_path,
'administrative_log_facility':
CONF.amphora_agent.administrative_log_facility,
'user_log_facility': CONF.amphora_agent.user_log_facility,
'user_log_facility':
CONF.amphora_agent.user_log_facility,
'connection_logging': self.connection_logging,
'enable_prometheus': enable_prometheus,
'require_insecure_fork': require_insecure_fork},
constants=constants, lib_consts=lib_consts)
'require_insecure_fork': require_insecure_fork,
}
try:
# Enable cpu-pinning only if the amphora TuneD profile is active
if "amphora" in amp_details["active_tuned_profiles"].split():
jinja_dict["cpu_count"] = int(amp_details["cpu_count"])
except (KeyError, TypeError):
pass
return self._get_template().render(
jinja_dict, constants=constants, lib_consts=lib_consts)
def _transform_loadbalancer(self, host_amphora, loadbalancer, listeners,
tls_certs, feature_compatibility):

View File

@ -26,6 +26,10 @@ global
{% if loadbalancer.global_connection_limit is defined %}
maxconn {{ loadbalancer.global_connection_limit }}
{% endif %}
{%- if cpu_count is defined and cpu_count > 1 %}
nbthread {{ cpu_count - 1 }}
cpu-map auto:1/1-{{ cpu_count - 1 }} 1-{{ cpu_count - 1 }}
{%- endif %}
{% set found_ns = namespace(found=false) %}
{% for listener in loadbalancer.listeners if listener.enabled %}
{% for pool in listener.pools if pool.enabled %}

View File

@ -2964,10 +2964,14 @@ class TestServerTestCase(base.TestCase):
haproxy_count = random.randrange(0, 100)
mock_count_haproxy.return_value = haproxy_count
tuned_profiles = "virtual-guest optimize-serial-console amphora"
expected_dict = {'active': True, 'api_version': '1.0',
expected_dict = {'active': True,
'active_tuned_profiles': tuned_profiles,
'api_version': '1.0',
'cpu': {'soft_irq': cpu_softirq, 'system': cpu_system,
'total': cpu_total, 'user': cpu_user},
'cpu_count': os.cpu_count(),
'disk': {'available': disk_available,
'used': disk_used},
'haproxy_count': haproxy_count,
@ -2995,6 +2999,9 @@ class TestServerTestCase(base.TestCase):
'topology_status': consts.TOPOLOGY_STATUS_OK,
'lvs_listener_process_count': 0}
with mock.patch("octavia.amphorae.backends.agent.api_server"
".amphora_info.open",
mock.mock_open(read_data=tuned_profiles)):
if distro == consts.UBUNTU:
rv = self.ubuntu_app.get('/' + api_server.VERSION + '/details')
elif distro == consts.CENTOS:

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
from unittest import mock
@ -151,11 +152,13 @@ class TestAmphoraInfo(base.TestCase):
original_version = api_server.VERSION
api_server.VERSION = self.API_VERSION
expected_dict = {u'active': True,
'active_tuned_profiles': '',
u'api_version': self.API_VERSION,
u'cpu': {u'soft_irq': u'8336',
u'system': u'52554',
u'total': 7503411,
u'user': u'252551'},
'cpu_count': os.cpu_count(),
u'disk': {u'available': 109079126016,
u'used': 25718685696},
u'haproxy_count': 5,
@ -235,11 +238,13 @@ class TestAmphoraInfo(base.TestCase):
original_version = api_server.VERSION
api_server.VERSION = self.API_VERSION
expected_dict = {u'active': True,
'active_tuned_profiles': '',
u'api_version': self.API_VERSION,
u'cpu': {u'soft_irq': u'8336',
u'system': u'52554',
u'total': 7503411,
u'user': u'252551'},
'cpu_count': os.cpu_count(),
u'disk': {u'available': 109079126016,
u'used': 25718685696},
u'haproxy_count': 5,

View File

@ -1626,6 +1626,57 @@ class TestHaproxyCfg(base.TestCase):
defaults=defaults, logging="\n"),
rendered_obj)
def test_render_template_amp_details(self):
j_cfg = jinja_cfg.JinjaTemplater(
base_amp_path='/var/lib/octavia',
base_crt_dir='/var/lib/octavia/certs',
connection_logging=False)
rendered_obj = j_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_amphora_tuple(),
[sample_configs_combined.sample_listener_tuple()],
amp_details={"cpu_count": 7,
"active_tuned_profiles": 'virtual-guest '
'optimize-serial-console '
'amphora'}
)
defaults = ("defaults\n"
" no log\n"
" retries 3\n"
" option redispatch\n"
" option splice-request\n"
" option splice-response\n"
" option http-keep-alive\n\n\n")
global_opts = (" maxconn 50000\n"
" nbthread 6\n"
" cpu-map auto:1/1-6 1-6\n")
self.assertEqual(
sample_configs_combined.sample_base_expected_config(
defaults=defaults, logging="\n", global_opts=global_opts),
rendered_obj)
def test_render_template_amp_details_cpu_count_none(self):
j_cfg = jinja_cfg.JinjaTemplater(
base_amp_path='/var/lib/octavia',
base_crt_dir='/var/lib/octavia/certs',
connection_logging=False)
rendered_obj = j_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_amphora_tuple(),
[sample_configs_combined.sample_listener_tuple()],
amp_details={"cpu_count": None},
)
defaults = ("defaults\n"
" no log\n"
" retries 3\n"
" option redispatch\n"
" option splice-request\n"
" option splice-response\n"
" option http-keep-alive\n\n\n")
global_opts = " maxconn 50000\n\n"
self.assertEqual(
sample_configs_combined.sample_base_expected_config(
defaults=defaults, logging="\n", global_opts=global_opts),
rendered_obj)
def test_haproxy_cfg_1_8_vs_1_5(self):
j_cfg = jinja_cfg.JinjaTemplater(
base_amp_path='/var/lib/octavia',
@ -1664,7 +1715,8 @@ class TestHaproxyCfg(base.TestCase):
sample_amphora,
[sample_proxy_listener],
tls_certs=None,
haproxy_versions=("1", "8", "1"))
haproxy_versions=("1", "8", "1"),
amp_details=None)
self.assertEqual(
sample_configs_combined.sample_base_expected_config(
global_opts=go, backend=be),
@ -1693,7 +1745,8 @@ class TestHaproxyCfg(base.TestCase):
sample_amphora,
[sample_proxy_listener],
tls_certs=None,
haproxy_versions=("1", "5", "18"))
haproxy_versions=("1", "5", "18"),
amp_details=None)
self.assertEqual(
sample_configs_combined.sample_base_expected_config(backend=be),
rendered_obj)
@ -1780,7 +1833,8 @@ class TestHaproxyCfg(base.TestCase):
sample_configs_combined.sample_amphora_tuple(),
[sample_listener],
tls_certs=None,
haproxy_versions=("1", "5", "18"))
haproxy_versions=("1", "5", "18"),
amp_details=None)
self.assertEqual(
sample_configs_combined.sample_base_expected_config(
frontend=fe, backend=be),
@ -1797,19 +1851,19 @@ class TestHaproxyCfg(base.TestCase):
j_cfg = jinja_cfg.JinjaTemplater()
j_cfg.build_config(mock_amp, mock_listeners, mock_tls_certs,
haproxy_versions=("0", "7", "0"),
socket_path=mock_socket_path)
socket_path=mock_socket_path, amp_details=None)
expected_fc = {}
mock_render_loadbalancer_obj.assert_called_once_with(
mock_amp, mock_listeners, tls_certs=mock_tls_certs,
socket_path=mock_socket_path,
socket_path=mock_socket_path, amp_details=None,
feature_compatibility=expected_fc)
mock_render_loadbalancer_obj.reset_mock()
j_cfg.build_config(mock_amp, mock_listeners, mock_tls_certs,
haproxy_versions=("1", "6", "0"),
socket_path=mock_socket_path)
socket_path=mock_socket_path, amp_details=None)
expected_fc = {
constants.HTTP_REUSE: True,
@ -1817,14 +1871,14 @@ class TestHaproxyCfg(base.TestCase):
}
mock_render_loadbalancer_obj.assert_called_once_with(
mock_amp, mock_listeners, tls_certs=mock_tls_certs,
socket_path=mock_socket_path,
socket_path=mock_socket_path, amp_details=None,
feature_compatibility=expected_fc)
mock_render_loadbalancer_obj.reset_mock()
j_cfg.build_config(mock_amp, mock_listeners, mock_tls_certs,
haproxy_versions=("1", "9", "0"),
socket_path=mock_socket_path)
socket_path=mock_socket_path, amp_details=None)
expected_fc = {
constants.HTTP_REUSE: True,
@ -1833,14 +1887,14 @@ class TestHaproxyCfg(base.TestCase):
}
mock_render_loadbalancer_obj.assert_called_once_with(
mock_amp, mock_listeners, tls_certs=mock_tls_certs,
socket_path=mock_socket_path,
socket_path=mock_socket_path, amp_details=None,
feature_compatibility=expected_fc)
mock_render_loadbalancer_obj.reset_mock()
j_cfg.build_config(mock_amp, mock_listeners, mock_tls_certs,
haproxy_versions=("2", "1", "1"),
socket_path=mock_socket_path)
socket_path=mock_socket_path, amp_details=None)
expected_fc = {
constants.HTTP_REUSE: True,
@ -1850,13 +1904,13 @@ class TestHaproxyCfg(base.TestCase):
}
mock_render_loadbalancer_obj.assert_called_once_with(
mock_amp, mock_listeners, tls_certs=mock_tls_certs,
socket_path=mock_socket_path,
socket_path=mock_socket_path, amp_details=None,
feature_compatibility=expected_fc)
mock_render_loadbalancer_obj.reset_mock()
j_cfg.build_config(mock_amp, mock_listeners, mock_tls_certs,
haproxy_versions=("2", "2", "1"),
haproxy_versions=("2", "2", "1"), amp_details=None,
socket_path=mock_socket_path)
expected_fc = {
@ -1868,27 +1922,27 @@ class TestHaproxyCfg(base.TestCase):
}
mock_render_loadbalancer_obj.assert_called_once_with(
mock_amp, mock_listeners, tls_certs=mock_tls_certs,
socket_path=mock_socket_path,
socket_path=mock_socket_path, amp_details=None,
feature_compatibility=expected_fc)
mock_render_loadbalancer_obj.reset_mock()
j_cfg.build_config(mock_amp, mock_listeners, mock_tls_certs,
haproxy_versions=("2", "4", "0"),
socket_path=mock_socket_path)
socket_path=mock_socket_path, amp_details=None)
mock_render_loadbalancer_obj.assert_called_once_with(
mock_amp, mock_listeners, tls_certs=mock_tls_certs,
socket_path=mock_socket_path,
socket_path=mock_socket_path, amp_details=None,
feature_compatibility=expected_fc)
mock_render_loadbalancer_obj.reset_mock()
j_cfg.build_config(mock_amp, mock_listeners, mock_tls_certs,
haproxy_versions=("3", "1", "0"),
socket_path=mock_socket_path)
socket_path=mock_socket_path, amp_details=None)
mock_render_loadbalancer_obj.assert_called_once_with(
mock_amp, mock_listeners, tls_certs=mock_tls_certs,
socket_path=mock_socket_path,
socket_path=mock_socket_path, amp_details=None,
feature_compatibility=expected_fc)

View File

@ -0,0 +1,9 @@
---
features:
- |
Amphora agent has been adjusted to complement the vertical scaling
optimizations implemented in the new cpu-pinning element. If the flavor
uses multiple vCPUs it will
configure HAProxy automatically to pin each of its worker threads
to an individual CPU that was isolated by the element (all vCPUs starting
from the second one).