Reduce ZK lock contention in executor

Now that the job request queue uses the tree cache, store
information on whether or not each request is locked on our
cached objects.  Use that to avoid attempting to lock a request
that our cache thinks is already locked.

This should reduce executor contention somewhat since as soon as
one executor locks a request, other executors can avoid
contributing to the thundering herd of lock attempts.

Change-Id: I0317eff129622858285359d3a857044a8317ccf5
This commit is contained in:
James E. Blair
2024-12-09 07:34:48 -08:00
parent 317e42a2e5
commit d42b45a83f
4 changed files with 106 additions and 3 deletions

View File

@@ -900,6 +900,98 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
a = reqs[0]
self.assertEqual(a.uuid, 'A')
def test_unlock_request(self):
# Test that locking and unlocking works
request_queue = queue.Queue()
event_queue = queue.Queue()
# A callback closure for the request queue
def rq_put():
request_queue.put(None)
# and the event queue
def eq_put(br, e):
event_queue.put((br, e))
# Simulate the client side
client = ExecutorApi(self.zk_client)
# Simulate the server side
server = ExecutorApi(self.zk_client,
build_request_callback=rq_put,
build_event_callback=eq_put)
# Scheduler submits request
request = BuildRequest(
"A", None, None, "job", "job_uuid", "tenant", "pipeline", '1')
client.submit(request, {'job': 'test'})
request_queue.get(timeout=30)
# Executor receives request
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
# Get a client copy of the request for later. Normally the
# client does not lock requests, but we will use this to
# simulate a second executor attempting to lock a request
# while our first executor operates. This ensures the lock
# contender counting works correctly.
client_a = self.getRequest(client, a.uuid)
# Executor locks request
self.assertTrue(server.lock(a, blocking=False))
# Someone else attempts to lock it
t = threading.Thread(target=client.lock, args=(client_a, True))
t.start()
# Wait for is_locked to be updated and both lock contenders to
# show in the cache:
for _ in iterate_timeout(30, "lock to propagate"):
r1 = self.getRequest(server, a.uuid)
r2 = self.getRequest(client, a.uuid)
if (r1.is_locked and
r2.is_locked and
r1.lock_contenders == 2 and
r2.lock_contenders == 2):
break
# Should see no pending requests
reqs = list(server.next())
self.assertEqual(len(reqs), 0)
reqs = list(client.next())
self.assertEqual(len(reqs), 0)
# Unlock request
server.unlock(a)
# Wait for client to get lock
t.join()
# Wait for lock_contenders to be updated:
for _ in iterate_timeout(30, "lock to propagate"):
r1 = self.getRequest(server, a.uuid)
r2 = self.getRequest(client, a.uuid)
if r1.lock_contenders == r2.lock_contenders == 1:
break
# Release client lock
client.unlock(client_a)
# Wait for is_locked to be updated:
for _ in iterate_timeout(30, "lock to propagate"):
r1 = self.getRequest(server, a.uuid)
r2 = self.getRequest(client, a.uuid)
if not r1.is_locked and not r2.is_locked:
break
# Should see pending requests
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
reqs = list(client.next())
self.assertEqual(len(reqs), 1)
client.remove(a)
class TestMergerApi(ZooKeeperBaseTestCase):
def _assertEmptyRoots(self, client):

View File

@@ -4834,6 +4834,7 @@ class JobRequest:
self._zstat = None
self.lock = None
self.is_locked = False
self.lock_contenders = 0
self.thread_lock = threading.Lock()
def toDict(self):

View File

@@ -125,7 +125,8 @@ class ExecutorApi:
for queue in self.zone_queues.values():
request2 = queue.getRequest(request.uuid)
if (request2 and
request2.state == BuildRequest.REQUESTED):
request2.state == BuildRequest.REQUESTED and
not request2.is_locked):
yield request2
break

View File

@@ -153,7 +153,13 @@ class JobRequestCache(ZuulTreeCache):
request = self.getRequest(parts[1])
if not request:
return
request.is_locked = exists
if len(parts) < 3:
return
if exists:
request.lock_contenders += 1
else:
request.lock_contenders -= 1
request.is_locked = bool(request.lock_contenders)
def objectFromRaw(self, key, data, stat):
if key[0] == 'requests':
@@ -264,7 +270,8 @@ class JobRequestQueue:
for request in self.inState(self.request_class.REQUESTED):
request = self.cache.getRequest(request.uuid)
if (request and
request.state == self.request_class.REQUESTED):
request.state == self.request_class.REQUESTED and
not request.is_locked):
yield request
def submit(self, request, params, needs_result=False):
@@ -493,6 +500,8 @@ class JobRequestQueue:
states = (self.request_class.RUNNING,)
for req in self.inState(*states):
try:
if req.is_locked:
continue
if self.isLocked(req):
continue
# It may have completed in the interim, so double