Add a zkprofile scheduler debug command

This adds a temporary debug command to the zuul-scheduler process.

It allows an operator to enable detailed ZK request profiling for
a given tenant-pipeline.  This will be used to identify opportunities
for further optimization.

Change-Id: Id6e0ee3ffc78874e91ebcdbfe14269c93af958cd
This commit is contained in:
James E. Blair
2022-10-27 09:23:49 -07:00
parent 45c04e4c69
commit eb7e0998e6
3 changed files with 77 additions and 0 deletions

View File

@@ -15,6 +15,8 @@
import configparser
import gc
import json
import logging
import os
import re
import shutil
@@ -508,6 +510,35 @@ class TestScheduler(ZuulTestCase):
for build in self.history:
self.assertTrue(build.parameters['zuul']['voting'])
def test_zk_profile(self):
command_socket = self.scheds.first.sched.config.get(
'scheduler', 'command_socket')
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
zplog = logging.getLogger('zuul.profile')
with self.assertLogs('zuul.profile', level='DEBUG') as logs:
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# assertNoLogs doesn't appear until py3.10, so we need to
# emit a single log line in order to assert that there
# aren't any others.
zplog.debug('test')
self.assertEqual(1, len(logs.output))
args = json.dumps(['tenant-one', 'check'])
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(command_socket)
s.sendall(f'zkprofile {args}\n'.encode('utf8'))
with self.assertLogs('zuul.profile', level='DEBUG'):
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(command_socket)
s.sendall(f'zkprofile {args}\n'.encode('utf8'))
with self.assertLogs('zuul.profile', level='DEBUG') as logs:
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
zplog.debug('test')
self.assertEqual(1, len(logs.output))
def test_initial_pipeline_gauges(self):
"Test that each pipeline reported its length on start"
self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.'

View File

@@ -137,10 +137,22 @@ class TenantReconfigureCommand(commandsocket.Command):
args = [TenantArgument]
class PipelineArgument(commandsocket.Argument):
name = 'pipeline'
help = 'The name of the pipeline'
class ZKProfileCommand(commandsocket.Command):
name = 'zkprofile'
help = 'Enable ZK profiling for a pipeline'
args = [TenantArgument, PipelineArgument]
COMMANDS = [
FullReconfigureCommand,
SmartReconfigureCommand,
TenantReconfigureCommand,
ZKProfileCommand,
commandsocket.StopCommand,
commandsocket.ReplCommand,
commandsocket.NoReplCommand,
@@ -192,6 +204,7 @@ class Scheduler(threading.Thread):
testonly=False):
threading.Thread.__init__(self)
self.daemon = True
self._profile_pipelines = set()
self.wait_for_init = wait_for_init
self.hostname = socket.getfqdn()
self.tracing = tracing.Tracing(config)
@@ -211,6 +224,7 @@ class Scheduler(threading.Thread):
SmartReconfigureCommand.name: self.smartReconfigureCommandHandler,
TenantReconfigureCommand.name:
self.tenantReconfigureCommandHandler,
ZKProfileCommand.name: self.zkProfileCommandHandler,
commandsocket.StopCommand.name: self.stop,
commandsocket.ReplCommand.name: self.startRepl,
commandsocket.NoReplCommand.name: self.stopRepl,
@@ -897,6 +911,11 @@ class Scheduler(threading.Thread):
def tenantReconfigureCommandHandler(self, tenant_name):
self._zuul_app.tenantReconfigure([tenant_name])
def zkProfileCommandHandler(self, tenant_name, pipeline_name):
key = set([(tenant_name, pipeline_name)])
self._profile_pipelines = self._profile_pipelines ^ key
self.log.debug("Now profiling %s", self._profile_pipelines)
def startRepl(self):
if self.repl:
return
@@ -2155,10 +2174,14 @@ class Scheduler(threading.Thread):
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
ctx = pipeline.manager.current_context
if (tenant.name, pipeline.name) in self._profile_pipelines:
ctx.profile = True
with self.statsd_timer(f'{stats_key}.refresh'):
pipeline.change_list.refresh(ctx)
pipeline.summary.refresh(ctx)
pipeline.state.refresh(ctx)
if (tenant.name, pipeline.name) in self._profile_pipelines:
ctx.profile = False
pipeline.state.setDirty(self.zk_client.client)
if pipeline.state.old_queues:

View File

@@ -14,6 +14,7 @@
import contextlib
import json
import logging
import time
import types
import zlib
@@ -26,6 +27,9 @@ from zuul.zk import ZooKeeperClient
class ZKContext:
profile_logger = logging.getLogger('zuul.profile')
profile_default = False
def __init__(self, zk_client, lock, stop_event, log):
if isinstance(zk_client, ZooKeeperClient):
client = zk_client.client
@@ -44,6 +48,7 @@ class ZKContext:
self.cumulative_read_bytes = 0
self.cumulative_write_bytes = 0
self.build_references = False
self.profile = self.profile_default
def sessionIsValid(self):
return ((not self.lock or self.lock.is_still_valid()) and
@@ -62,6 +67,18 @@ class ZKContext:
self.cumulative_read_bytes += other.cumulative_read_bytes
self.cumulative_write_bytes += other.cumulative_write_bytes
def profileEvent(self, etype, path):
if not self.profile:
return
self.profile_logger.debug(
'ZK 0x%x %s %s '
'rt=%s wt=%s ro=%s wo=%s rn=%s wn=%s rb=%s wb=%s',
id(self), etype, path,
self.cumulative_read_time, self.cumulative_write_time,
self.cumulative_read_objects, self.cumulative_write_objects,
self.cumulative_read_znodes, self.cumulative_write_znodes,
self.cumulative_read_bytes, self.cumulative_write_bytes)
class LocalZKContext:
"""A Local ZKContext that means don't actually write anything to ZK"""
@@ -201,6 +218,7 @@ class ZKObject:
try:
self._retry(context, context.client.delete,
path, recursive=True)
context.profileEvent('delete', path)
return
except Exception:
context.log.error(
@@ -283,6 +301,7 @@ class ZKObject:
try:
compressed_data, zstat = self._retry(context, self._retryableLoad,
context, path)
context.profileEvent('get', path)
except Exception:
context.log.error(
"Exception loading ZKObject %s at %s", self, path)
@@ -332,6 +351,7 @@ class ZKObject:
zstat = self._retry(context, self._retryableSave,
context, create, path, compressed_data,
version)
context.profileEvent('set', path)
except Exception:
context.log.error(
"Exception saving ZKObject %s at %s", self, path)
@@ -385,6 +405,7 @@ class ShardedZKObject(ZKObject):
self._set(_zkobject_hash=None)
data, compressed_size = self._retry(context, self._retryableLoad,
context, path)
context.profileEvent('get', path)
self._set(**self.deserialize(data, context))
self._set(_zkobject_hash=hash(data),
_zkobject_compressed_size=compressed_size,
@@ -421,10 +442,12 @@ class ShardedZKObject(ZKObject):
try:
if create and not self.truncate_on_create:
exists = self._retry(context, context.client.exists, path)
context.profileEvent('exists', path)
if exists is not None:
raise NodeExistsError
compressed_size = self._retry(context, self._retryableSave,
context, path, data)
context.profileEvent('set', path)
self._set(_zkobject_hash=hash(data),
_zkobject_compressed_size=compressed_size,
_zkobject_uncompressed_size=len(data),