Add readiness/liveness probes to prometheus server

To facilitate automation of rolling restarts, configure the prometheus
server to answer readiness and liveness probes.  We are 'live' if the
process is running, and we are 'ready' if our component state is
either running or paused (not initializing or stopped).

The prometheus_client library doesn't support this directly, so we need
to handle this ourselves.  We could create yet another HTTP server that
each component would need to start, or we could take advantage of the
fact that the prometheus_client is a standard WSGI service and just
wrap it in our own WSGI service that adds the extra endpoints needed.
Since that is far simpler and less resounce intensive, that is what
this change does.

The prometheus_client will actually return the metrics on any path
given to it.  In order to reduce the chances of an operator configuring
a liveness probe with a typo (eg '/healthy/ready') and getting the
metrics page served with a 200 response, we restrict the metrics to
only the '/metrics' URI which is what we specified in our documentation,
and also '/' which is very likely accidentally used by users.

Change-Id: I154ca4896b69fd52eda655209480a75c8d7dbac3
This commit is contained in:
James E. Blair 2021-12-08 15:42:39 -08:00
parent 9dba1af7a9
commit 704fef6cb9
16 changed files with 230 additions and 50 deletions

View File

@ -554,10 +554,6 @@ prometheus's target.
Configuration
~~~~~~~~~~~~~
Prometheus support uses the ``prometheus_client`` python module.
Note that support is optional and Zuul will start without
the prometheus python module present.
To enable the service, set the ``prometheus_port`` in a service section of
``zuul.conf``. For example setting :attr:`scheduler.prometheus_port` to 9091
starts a HTTP server to expose metrics to a prometheus services at:
@ -583,3 +579,27 @@ These metrics are exposed by default:
.. stat:: process_cpu_seconds_total
:type: counter
.. _prometheus_liveness:
Liveness Probes
~~~~~~~~~~~~~~~
The Prometheus server also supports liveness and ready probes at the
following URIS:
.. path:: /health/live
Returns 200 as long as the process is running.
.. path:: /health/ready
Returns 200 if the process is in `RUNNING` or `PAUSED` states.
Otherwise, returns 503. Note that 503 is returned for
`INITIALIZED`, so this may be used to determine when a component
has completely finished loading configuration.
.. path:: /health/status
This always returns 200, but includes the component status as the
text body of the response.

View File

@ -0,0 +1,11 @@
---
features:
- |
The optional prometheus service now also includes endpoints for
readiness and liveness checks. See :ref:`prometheus_liveness` for
details.
upgrade:
- |
The prometheus endpoint would previously serve metrics at any URI.
It now only returns metrics on the `/` and `/metrics` URI. The
latter is recommended.

View File

@ -65,7 +65,6 @@ import testtools.content_type
from git.exc import NoSuchPathError
import yaml
import paramiko
import prometheus_client.exposition
import sqlalchemy
import requests_mock
@ -950,26 +949,6 @@ class FakeGerritChange(object):
self.reported += 1
class PrometheusServer(object):
def start(self):
app = prometheus_client.make_wsgi_app(prometheus_client.REGISTRY)
self.httpd = prometheus_client.exposition.make_server(
"0.0.0.0",
0,
app,
prometheus_client.exposition.ThreadingWSGIServer,
handler_class=prometheus_client.exposition._SilentHandler)
self.port = self.httpd.socket.getsockname()[1]
self.thread = threading.Thread(target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.httpd.shutdown()
self.thread.join()
self.httpd.socket.close()
class GerritWebServer(object):
def __init__(self, fake_gerrit):
@ -4462,10 +4441,6 @@ class ZuulTestCase(BaseTestCase):
server that all of the Zuul components in this test use to
communicate with each other.
:ivar PrometheusServer prometheus_server: An instance of
:py:class: ~test.base.PrometheusServer` which is the Prometheus
metrics endpoint.
:ivar RecordingExecutorServer executor_server: An instance of
:py:class:`~tests.base.RecordingExecutorServer` which is the
Ansible execute server used to run jobs for this test.
@ -4587,8 +4562,6 @@ class ZuulTestCase(BaseTestCase):
self.statsd.start()
self.gearman_server = FakeGearmanServer(self.use_ssl)
self.prometheus_server = PrometheusServer()
self.prometheus_server.start()
self.config.set('gearman', 'port', str(self.gearman_server.port))
self.log.info("Gearman server on port %s" %
@ -5045,7 +5018,6 @@ class ZuulTestCase(BaseTestCase):
self.statsd.join()
self.rpcclient.shutdown()
self.gearman_server.shutdown()
self.prometheus_server.stop()
self.fake_nodepool.stop()
self.zk_client.disconnect()
self.printHistory()

41
tests/fixtures/zuul-prometheus.conf vendored Normal file
View File

@ -0,0 +1,41 @@
[gearman]
server=127.0.0.1
[statsd]
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
# see: https://github.com/jsocol/pystatsd/issues/61
server=127.0.0.1
[scheduler]
tenant_config=main.yaml
relative_priority=true
prometheus_port=0
[merger]
git_dir=/tmp/zuul-test/merger-git
git_user_email=zuul@example.com
git_user_name=zuul
[executor]
git_dir=/tmp/zuul-test/executor-git
load_multiplier=100
[connection gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=fake_id_rsa_path
[connection smtp]
driver=smtp
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com
[database]
dburi=$MYSQL_FIXTURE_DBURI$
[web]
static_cache_expiry=1200
root=https://zuul.example.com/

View File

@ -18,12 +18,18 @@ from tests.base import ZuulTestCase
class BaseTestPrometheus(ZuulTestCase):
config_file = 'zuul-prometheus.conf'
tenant_config_file = 'config/single-tenant/main.yaml'
def get_metrics(self):
r = requests.get(
"http://localhost:%d" % self.prometheus_server.port)
def get_path(self, path):
return requests.get(
"http://localhost:%d%s" % (
self.scheds.first.sched.monitoring_server.port,
path))
def get_metrics(self, path=''):
metrics = {}
r = self.get_path(path)
for line in r.text.split('\n'):
if not line or line.startswith("#"):
continue
@ -40,3 +46,28 @@ class TestPrometheus(BaseTestPrometheus):
metrics = self.get_metrics()
self.assertIn("process_resident_memory_bytes", metrics)
self.assertIn("process_open_fds", metrics)
metrics = self.get_metrics('/metrics')
self.assertIn("process_resident_memory_bytes", metrics)
self.assertIn("process_open_fds", metrics)
def test_health(self):
r = self.get_path('/health/live')
self.assertEqual(r.status_code, 200)
r = self.get_path('/health/ready')
self.assertEqual(r.status_code, 200)
r = self.get_path('/health/status')
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, 'RUNNING')
r = self.get_path('/dne')
self.assertEqual(r.status_code, 404)
self.scheds.first.sched.component_info.state = \
self.scheds.first.sched.component_info.INITIALIZING
r = self.get_path('/health/live')
self.assertEqual(r.status_code, 200)
r = self.get_path('/health/ready')
self.assertEqual(r.status_code, 503)
r = self.get_path('/health/status')
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, 'INITIALIZING')

View File

@ -28,7 +28,6 @@ import sys
import traceback
import threading
prometheus_client = extras.try_import('prometheus_client')
yappi = extras.try_import('yappi')
objgraph = extras.try_import('objgraph')
@ -198,15 +197,6 @@ class ZuulDaemonApp(ZuulApp, metaclass=abc.ABCMeta):
"Configured logging: {version}".format(
version=zuul_version_info.release_string()))
def setup_prometheus(self, section):
if self.config.has_option(section, 'prometheus_port'):
if not prometheus_client:
raise RuntimeError("prometheus_client library is missing.")
port = int(self.config.get(section, 'prometheus_port'))
addr = get_default(
self.config, section, 'prometheus_addr', '0.0.0.0')
prometheus_client.start_http_server(port, addr)
def main(self):
self.parseArguments()
self.readConfig()

View File

@ -98,7 +98,6 @@ class Executor(zuul.cmd.ZuulDaemonApp):
os.mkdir(self.job_dir)
self.setup_logging('executor', 'log_config')
self.setup_prometheus('executor')
self.log = logging.getLogger("zuul.Executor")
self.finger_port = int(

View File

@ -50,7 +50,6 @@ class Merger(zuul.cmd.ZuulDaemonApp):
self.configure_connections(source_only=True)
self.setup_logging('merger', 'log_config')
self.setup_prometheus('merger')
self.merger = MergeServer(self.config, self.connections)
self.merger.start()

View File

@ -133,7 +133,6 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.start_gear_server()
self.setup_logging('scheduler', 'log_config')
self.setup_prometheus('scheduler')
self.log = logging.getLogger("zuul.Scheduler")
self.configure_connections(require_sql=True)

View File

@ -85,7 +85,6 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
sys.exit(0)
self.setup_logging('web', 'log_config')
self.setup_prometheus('web')
self.log = logging.getLogger("zuul.WebServer")
try:

View File

@ -41,6 +41,7 @@ from zuul.lib.result_data import get_warnings_from_result_data
from zuul.lib import yamlutil as yaml
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.monitoring import MonitoringServer
from zuul.lib.statsd import get_statsd
from zuul.lib import filecomments
from zuul.lib.keystorage import KeyStorage
@ -3162,6 +3163,9 @@ class ExecutorServer(BaseMergeServer):
self.component_info = ExecutorComponent(
self.zk_client, self.hostname, version=get_version_string())
self.component_info.register()
self.monitoring_server = MonitoringServer(self.config, 'executor',
self.component_info)
self.monitoring_server.start()
self.log_streaming_port = log_streaming_port
self.governor_lock = threading.Lock()
self.run_lock = threading.Lock()
@ -3450,6 +3454,7 @@ class ExecutorServer(BaseMergeServer):
# that all ZK related components can be stopped first.
super().stop()
self.stop_repl()
self.monitoring_server.stop()
self.log.debug("Stopped")
def join(self):
@ -3461,6 +3466,7 @@ class ExecutorServer(BaseMergeServer):
self.build_loop_wake_event.set()
self.build_worker.join()
self.command_thread.join()
self.monitoring_server.join()
def pause(self):
self.log.debug('Pausing')

View File

@ -24,6 +24,7 @@ from zuul.exceptions import StreamingError
from zuul.lib import streamer_utils
from zuul.lib.commandsocket import CommandSocket
from zuul.lib.config import get_default
from zuul.lib.monitoring import MonitoringServer
from zuul.version import get_version_string
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ComponentRegistry, FingerGatewayComponent
@ -187,6 +188,10 @@ class FingerGateway(object):
self.component_info.use_ssl = True
self.component_info.register()
self.monitoring_server = MonitoringServer(config, 'fingergw',
self.component_info)
self.monitoring_server.start()
self.component_registry = ComponentRegistry(self.zk_client)
self.executor_api = ExecutorApi(self.zk_client, use_cache=False)
@ -271,6 +276,7 @@ class FingerGateway(object):
self.log.exception("Error stopping command socket:")
self.zk_client.disconnect()
self.monitoring_server.stop()
self.log.info("Finger gateway is stopped")
@ -282,3 +288,4 @@ class FingerGateway(object):
if self.command_thread:
self.command_thread.join()
self.monitoring_server.join()

85
zuul/lib/monitoring.py Normal file
View File

@ -0,0 +1,85 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import prometheus_client
from zuul.lib.config import get_default
class MonitoringServer:
def __init__(self, config, section, component_info):
if not config.has_option(section, 'prometheus_port'):
self.httpd = None
return
self.component_info = component_info
port = int(config.get(section, 'prometheus_port'))
addr = get_default(
config, section, 'prometheus_addr', '0.0.0.0')
self.prometheus_app = prometheus_client.make_wsgi_app(
prometheus_client.registry.REGISTRY)
self.httpd = prometheus_client.exposition.make_server(
addr, port,
self.handleRequest,
prometheus_client.exposition.ThreadingWSGIServer,
handler_class=prometheus_client.exposition._SilentHandler)
# The unit tests pass in 0 for the port
self.port = self.httpd.socket.getsockname()[1]
def start(self):
if self.httpd is None:
return
self.thread = threading.Thread(target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()
def stop(self):
if self.httpd is None:
return
self.httpd.shutdown()
def join(self):
if self.httpd is None:
return
self.thread.join()
self.httpd.socket.close()
def handleRequest(self, environ, start_response):
headers = []
output = b''
if environ['PATH_INFO'] == '/health/live':
status = '200 OK'
elif environ['PATH_INFO'] == '/health/ready':
if (self.component_info.state in (
self.component_info.RUNNING,
self.component_info.PAUSED)):
status = '200 OK'
else:
status = '503 Service Unavailable'
elif environ['PATH_INFO'] == '/health/status':
status = '200 OK'
headers = [('Content-Type', 'text/plain')]
output = str(self.component_info.state).encode('utf8').upper()
elif environ['PATH_INFO'] in ('/metrics', '/'):
# The docs say '/metrics' but '/' worked and was likely
# used by users, so let's support both for now.
return self.prometheus_app(environ, start_response)
else:
status = '404 Not Found'
# Return output
start_response(status, headers)
return [output]

View File

@ -32,6 +32,7 @@ from zuul.merger.merger import nullcontext
from zuul.model import (
FilesChangesCompletedEvent, MergeCompletedEvent, MergeRequest
)
from zuul.lib.monitoring import MonitoringServer
from zuul.version import get_version_string
from zuul.zk import ZooKeeperClient
from zuul.zk.components import MergerComponent
@ -470,6 +471,10 @@ class MergeServer(BaseMergeServer):
self.zk_client, self.hostname, version=get_version_string())
self.component_info.register()
self.monitoring_server = MonitoringServer(self.config, 'merger',
self.component_info)
self.monitoring_server.start()
self.command_map = dict(
stop=self.stop,
pause=self.pause,
@ -499,10 +504,12 @@ class MergeServer(BaseMergeServer):
super().stop()
self._command_running = False
self.command_socket.stop()
self.monitoring_server.stop()
self.log.debug("Stopped")
def join(self):
super().join()
self.monitoring_server.join()
def pause(self):
self.log.debug('Pausing')

View File

@ -36,6 +36,7 @@ from zuul.lib.ansible import AnsibleManager
from zuul.lib.config import get_default
from zuul.lib.keystorage import KeyStorage
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.monitoring import MonitoringServer
from zuul.lib.queue import NamedQueue
from zuul.lib.times import Times
from zuul.lib.statsd import get_statsd, normalize_statsd_name
@ -197,6 +198,10 @@ class Scheduler(threading.Thread):
self.wake_event.set)
self.unparsed_config_cache = UnparsedConfigCache(self.zk_client)
self.monitoring_server = MonitoringServer(self.config, 'scheduler',
self.component_info)
self.monitoring_server.start()
# TODO (swestphahl): Remove after we've refactored reconfigurations
# to be performed on the tenant level.
self.reconfigure_event_queue = NamedQueue("ReconfigureEventQueue")
@ -319,6 +324,9 @@ class Scheduler(threading.Thread):
self.command_thread.join()
self.log.debug("Stopping timedb thread")
self.times.join()
self.log.debug("Stopping monitoring server")
self.monitoring_server.stop()
self.monitoring_server.join()
self.zk_client.disconnect()
def runCommand(self):

View File

@ -39,6 +39,7 @@ from zuul.lib import commandsocket, encryption, streamer_utils
from zuul.lib.ansible import AnsibleManager
from zuul.lib.jsonutil import ZuulJSONEncoder
from zuul.lib.keystorage import KeyStorage
from zuul.lib.monitoring import MonitoringServer
from zuul.lib.re2util import filter_allowed_disallowed
from zuul.model import (
Abide,
@ -1618,6 +1619,10 @@ class ZuulWeb(object):
self.zk_client, self.hostname, version=get_version_string())
self.component_info.register()
self.monitoring_server = MonitoringServer(self.config, 'web',
self.component_info)
self.monitoring_server.start()
self.component_registry = ComponentRegistry(self.zk_client)
self.system_config_cache_wake_event = threading.Event()
@ -1870,10 +1875,11 @@ class ZuulWeb(object):
self.stop_repl()
self._command_running = False
self.command_socket.stop()
self.command_thread.join()
self.monitoring_server.stop()
def join(self):
self.command_thread.join()
self.monitoring_server.join()
def runCommand(self):
while self._command_running: