Separate slow rpc operations

The RPCListener processes all jobs serially within a single
thread. However we have some rpc calls like enqueue, dequeue that wait
until the action is done which can take multiple minutes if a
reconfiguration is in progress. During this time all of zuul-web is
blocked since it relies on the rpc mechanism to get information from
the scheduler. This can be solved generally when working towards the
HA scheduler by doing stuff using zk. Until then we can separate long
taking rpc calls into a different listener so status calls are not
blocked by enqueue/dequeue.

Change-Id: Ie5b07b7913d3c88bd267801b2edf09c39fedbe79
This commit is contained in:
Tobias Henkel 2020-08-04 12:54:23 +02:00
parent e64fd8fff8
commit 978272b98d
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
2 changed files with 136 additions and 103 deletions

View File

@ -16,6 +16,8 @@
import json
import logging
import time
from abc import ABCMeta
from typing import List
from zuul import model
from zuul.connection import BaseConnection
@ -24,48 +26,24 @@ from zuul.lib.gearworker import ZuulGearWorker
from zuul.lib.jsonutil import ZuulJSONEncoder
class RPCListener(object):
log = logging.getLogger("zuul.RPCListener")
class RPCListenerBase(metaclass=ABCMeta):
log = logging.getLogger("zuul.RPCListenerBase")
thread_name = 'zuul-rpc-gearman-worker'
functions = [] # type: List[str]
def __init__(self, config, sched):
self.config = config
self.sched = sched
self.jobs = {}
functions = [
'autohold',
'autohold_delete',
'autohold_info',
'autohold_list',
'allowed_labels_get',
'dequeue',
'enqueue',
'enqueue_ref',
'promote',
'get_admin_tenants',
'get_running_jobs',
'get_job_log_stream_address',
'tenant_list',
'tenant_sql_connection',
'status_get',
'job_get',
'job_list',
'project_get',
'project_list',
'project_freeze_jobs',
'pipeline_list',
'key_get',
'config_errors_list',
'connection_list',
'authorize_user',
]
for func in functions:
for func in self.functions:
f = getattr(self, 'handle_%s' % func)
self.jobs['zuul:%s' % func] = f
self.gearworker = ZuulGearWorker(
'Zuul RPC Listener',
'zuul.RPCListener',
'zuul-rpc-gearman-worker',
self.log.name,
self.thread_name,
self.config,
self.jobs)
@ -80,6 +58,17 @@ class RPCListener(object):
def join(self):
self.gearworker.join()
class RPCListenerSlow(RPCListenerBase):
log = logging.getLogger("zuul.RPCListenerSlow")
thread_name = 'zuul-rpc-slow-gearman-worker'
functions = [
'dequeue',
'enqueue',
'enqueue_ref',
'promote',
]
def handle_dequeue(self, job):
args = json.loads(job.arguments)
tenant_name = args['tenant']
@ -95,77 +84,6 @@ class RPCListener(object):
return
job.sendWorkComplete()
def handle_autohold_info(self, job):
args = json.loads(job.arguments)
request_id = args['request_id']
try:
data = self.sched.autohold_info(request_id)
except Exception as e:
job.sendWorkException(str(e).encode('utf8'))
return
job.sendWorkComplete(json.dumps(data))
def handle_autohold_delete(self, job):
args = json.loads(job.arguments)
request_id = args['request_id']
try:
self.sched.autohold_delete(request_id)
except Exception as e:
job.sendWorkException(str(e).encode('utf8'))
return
job.sendWorkComplete()
def handle_autohold_list(self, job):
data = self.sched.autohold_list()
job.sendWorkComplete(json.dumps(data))
def handle_autohold(self, job):
args = json.loads(job.arguments)
params = {}
tenant = self.sched.abide.tenants.get(args['tenant'])
if tenant:
params['tenant_name'] = args['tenant']
else:
error = "Invalid tenant: %s" % args['tenant']
job.sendWorkException(error.encode('utf8'))
return
(trusted, project) = tenant.getProject(args['project'])
if project:
params['project_name'] = project.canonical_name
else:
error = "Invalid project: %s" % args['project']
job.sendWorkException(error.encode('utf8'))
return
if args['change'] and args['ref']:
job.sendWorkException("Change and ref can't be both used "
"for the same request")
if args['change']:
# Convert change into ref based on zuul connection
ref_filter = project.source.getRefForChange(args['change'])
elif args['ref']:
ref_filter = "%s" % args['ref']
else:
ref_filter = ".*"
params['job_name'] = args['job']
params['ref_filter'] = ref_filter
params['reason'] = args['reason']
if args['count'] < 0:
error = "Invalid count: %d" % args['count']
job.sendWorkException(error.encode('utf8'))
return
params['count'] = args['count']
params['node_hold_expiration'] = args['node_hold_expiration']
self.sched.autohold(**params)
job.sendWorkComplete()
def _common_enqueue(self, job):
args = json.loads(job.arguments)
event = model.TriggerEvent()
@ -253,6 +171,116 @@ class RPCListener(object):
self.sched.promote(tenant_name, pipeline_name, change_ids)
job.sendWorkComplete()
class RPCListener(RPCListenerBase):
log = logging.getLogger("zuul.RPCListener")
thread_name = 'zuul-rpc-gearman-worker'
functions = [
'autohold',
'autohold_delete',
'autohold_info',
'autohold_list',
'allowed_labels_get',
'get_admin_tenants',
'get_running_jobs',
'get_job_log_stream_address',
'tenant_list',
'tenant_sql_connection',
'status_get',
'job_get',
'job_list',
'project_get',
'project_list',
'project_freeze_jobs',
'pipeline_list',
'key_get',
'config_errors_list',
'connection_list',
'authorize_user',
]
def start(self):
self.gearworker.start()
def stop(self):
self.log.debug("Stopping")
self.gearworker.stop()
self.log.debug("Stopped")
def join(self):
self.gearworker.join()
def handle_autohold_info(self, job):
args = json.loads(job.arguments)
request_id = args['request_id']
try:
data = self.sched.autohold_info(request_id)
except Exception as e:
job.sendWorkException(str(e).encode('utf8'))
return
job.sendWorkComplete(json.dumps(data))
def handle_autohold_delete(self, job):
args = json.loads(job.arguments)
request_id = args['request_id']
try:
self.sched.autohold_delete(request_id)
except Exception as e:
job.sendWorkException(str(e).encode('utf8'))
return
job.sendWorkComplete()
def handle_autohold_list(self, job):
data = self.sched.autohold_list()
job.sendWorkComplete(json.dumps(data))
def handle_autohold(self, job):
args = json.loads(job.arguments)
params = {}
tenant = self.sched.abide.tenants.get(args['tenant'])
if tenant:
params['tenant_name'] = args['tenant']
else:
error = "Invalid tenant: %s" % args['tenant']
job.sendWorkException(error.encode('utf8'))
return
(trusted, project) = tenant.getProject(args['project'])
if project:
params['project_name'] = project.canonical_name
else:
error = "Invalid project: %s" % args['project']
job.sendWorkException(error.encode('utf8'))
return
if args['change'] and args['ref']:
job.sendWorkException("Change and ref can't be both used "
"for the same request")
if args['change']:
# Convert change into ref based on zuul connection
ref_filter = project.source.getRefForChange(args['change'])
elif args['ref']:
ref_filter = "%s" % args['ref']
else:
ref_filter = ".*"
params['job_name'] = args['job']
params['ref_filter'] = ref_filter
params['reason'] = args['reason']
if args['count'] < 0:
error = "Invalid count: %d" % args['count']
job.sendWorkException(error.encode('utf8'))
return
params['count'] = args['count']
params['node_hold_expiration'] = args['node_hold_expiration']
self.sched.autohold(**params)
job.sendWorkComplete()
def handle_get_running_jobs(self, job):
# args = json.loads(job.arguments)
# TODO: use args to filter by pipeline etc

View File

@ -309,6 +309,7 @@ class Scheduler(threading.Thread):
self.connections = None
self.statsd = get_statsd(config)
self.rpc = rpclistener.RPCListener(config, self)
self.rpc_slow = rpclistener.RPCListenerSlow(config, self)
self.repl = None
self.stats_thread = threading.Thread(target=self.runStats)
self.stats_thread.daemon = True
@ -369,6 +370,7 @@ class Scheduler(threading.Thread):
self.command_thread.start()
self.rpc.start()
self.rpc_slow.start()
self.stats_thread.start()
def stop(self):
@ -379,6 +381,8 @@ class Scheduler(threading.Thread):
self.stats_thread.join()
self.rpc.stop()
self.rpc.join()
self.rpc_slow.stop()
self.rpc_slow.join()
self.stop_repl()
self._command_running = False
self.command_socket.stop()
@ -428,6 +432,7 @@ class Scheduler(threading.Thread):
if not self.statsd:
return
functions = getGearmanFunctions(self.rpc.gearworker.gearman)
functions.update(getGearmanFunctions(self.rpc_slow.gearworker.gearman))
executors_accepting = 0
executors_online = 0
execute_queue = 0