Remove unneeded scheduler.zk_nodepool object

The scheduler has a Nodeool object, and the Nodepool object has
a ZooKeeperNodepool object.  Separately, the scheduler also has a
standalone ZooKeeperNodepool object.  Rather than having a second
zk_nodepool object, just reach into Nodepool object and use its
zk_nodepool object directly.

This is more important now that ZooKeeperNodepool maintains a
node request cache (and will also maintain a node cache in a future
change).  This means that the scheduler was keeping two in-memory
caches, which is extra work being performed.

Because one of the zk_nodepool objects was being used to generate
nodes provisioned events, and the other was being used to process
them, if their caches weren't in sync, the scheduler could end up
marking node requests as failed when they actually succeeded.

The dual cache issue is why we saw this issue in tests, but the
same issue would be present with multiple schedulers too, so we
also update the getNodeRequest method to make the cache optional.
We bypass the cache where we must be certain we have the most
up to date info.

Change-Id: I89242a01f656abce143bfb991670d452deae8b72
This commit is contained in:
James E. Blair 2021-09-10 07:34:43 -07:00
parent 267571a892
commit 678bc4846c
5 changed files with 42 additions and 35 deletions

View File

@ -5126,6 +5126,10 @@ class ZuulTestCase(BaseTestCase):
def release(self, job):
job.release()
@property
def sched_zk_nodepool(self):
return self.scheds.first.sched.nodepool.zk_nodepool
@property
def hold_jobs_in_queue(self):
return self.executor_api.hold_in_queue

View File

@ -333,9 +333,9 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
request_list = self.sched_zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request = self.sched_zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
@ -364,9 +364,9 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
request_list = self.sched_zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request = self.sched_zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
@ -396,9 +396,9 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
request_list = self.sched_zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request = self.sched_zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
@ -1931,9 +1931,9 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
request_list = self.sched_zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request = self.sched_zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
@ -2023,7 +2023,7 @@ class TestScheduler(ZuulTestCase):
# The hold request current_count should have incremented
# and we should have recorded the held node ID.
request2 = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request2 = self.sched_zk_nodepool.getHoldRequest(
request.id)
self.assertEqual(request.current_count + 1, request2.current_count)
self.assertEqual(1, len(request2.nodes))
@ -2053,12 +2053,12 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(held_nodes, 1)
# request current_count should not have changed
request3 = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request3 = self.sched_zk_nodepool.getHoldRequest(
request2.id)
self.assertEqual(request2.current_count, request3.current_count)
# Deleting hold request should set held nodes to used
self.scheds.first.sched.zk_nodepool.deleteHoldRequest(request3)
self.sched_zk_nodepool.deleteHoldRequest(request3)
node_states = [n['state'] for n in self.fake_nodepool.getNodes()]
self.assertEqual(3, len(node_states))
self.assertEqual([zuul.model.STATE_USED] * 3, node_states)
@ -2078,9 +2078,9 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
request_list = self.sched_zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request = self.sched_zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
@ -2103,15 +2103,15 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
request_list = self.sched_zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request = self.sched_zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
# Delete and verify no more requests
self.assertTrue(client.autohold_delete(request.id))
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
request_list = self.sched_zk_nodepool.getHoldRequests()
self.assertEqual([], request_list)
def test_autohold_padding(self):
@ -2123,9 +2123,9 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
request_list = self.sched_zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request = self.sched_zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
@ -2136,7 +2136,7 @@ class TestScheduler(ZuulTestCase):
trimmed_request = request.id[5:]
# Delete and verify no more requests
self.assertTrue(client.autohold_delete(trimmed_request))
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
request_list = self.sched_zk_nodepool.getHoldRequests()
self.assertEqual([], request_list)
def _test_autohold_scoped(self, change_obj, change, ref):

View File

@ -102,7 +102,7 @@ class Nodepool(object):
self.election_won = True
try:
for rid in self.zk_nodepool.getNodeRequests():
request = self.zk_nodepool.getNodeRequest(rid)
request = self.zk_nodepool.getNodeRequest(rid, cached=True)
if request.requestor != self.system_id:
continue
if (request.state in {model.STATE_FULFILLED,

View File

@ -100,7 +100,6 @@ from zuul.zk.locks import (
management_queue_lock,
trigger_queue_lock,
)
from zuul.zk.nodepool import ZooKeeperNodepool
from zuul.zk.system import ZuulSystem
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
@ -175,7 +174,6 @@ class Scheduler(threading.Thread):
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.system = ZuulSystem(self.zk_client)
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
self.component_info = SchedulerComponent(self.zk_client, self.hostname)
self.component_info.register()
self.component_registry = ComponentRegistry(self.zk_client)
@ -541,7 +539,7 @@ class Scheduler(threading.Thread):
# Get all the requests in ZK that belong to us
zk_requests = set()
for req_id in self.nodepool.zk_nodepool.getNodeRequests():
req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
req = self.nodepool.zk_nodepool.getNodeRequest(req_id, cached=True)
if req.requestor == self.system.system_id:
zk_requests.add(req_id)
# Get all the current node requests in the queues
@ -555,7 +553,8 @@ class Scheduler(threading.Thread):
for req_id in leaked_requests:
try:
self.log.warning("Deleting leaked node request: %s", req_id)
req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
req = self.nodepool.zk_nodepool.getNodeRequest(req_id,
cached=True)
self.nodepool.zk_nodepool.deleteNodeRequest(req)
except Exception:
self.log.exception("Error deleting leaked node request: %s",
@ -817,15 +816,15 @@ class Scheduler(threading.Thread):
request.node_expiration = node_hold_expiration
# No need to lock it since we are creating a new one.
self.zk_nodepool.storeHoldRequest(request)
self.nodepool.zk_nodepool.storeHoldRequest(request)
def autohold_list(self):
'''
Return current hold requests as a list of dicts.
'''
data = []
for request_id in self.zk_nodepool.getHoldRequests():
request = self.zk_nodepool.getHoldRequest(request_id)
for request_id in self.nodepool.zk_nodepool.getHoldRequests():
request = self.nodepool.zk_nodepool.getHoldRequest(request_id)
if not request:
continue
data.append(request.toDict())
@ -838,7 +837,8 @@ class Scheduler(threading.Thread):
:param str hold_request_id: The unique ID of the request to delete.
'''
try:
hold_request = self.zk_nodepool.getHoldRequest(hold_request_id)
hold_request = self.nodepool.zk_nodepool.getHoldRequest(
hold_request_id)
except Exception:
self.log.exception(
"Error retrieving autohold ID %s:", hold_request_id)
@ -856,7 +856,8 @@ class Scheduler(threading.Thread):
'''
hold_request = None
try:
hold_request = self.zk_nodepool.getHoldRequest(hold_request_id)
hold_request = self.nodepool.zk_nodepool.getHoldRequest(
hold_request_id)
except Exception:
self.log.exception(
"Error retrieving autohold ID %s:", hold_request_id)
@ -868,7 +869,7 @@ class Scheduler(threading.Thread):
self.log.debug("Removing autohold %s", hold_request)
try:
self.zk_nodepool.deleteHoldRequest(hold_request)
self.nodepool.zk_nodepool.deleteHoldRequest(hold_request)
except Exception:
self.log.exception(
"Error removing autohold request %s:", hold_request)
@ -2085,7 +2086,7 @@ class Scheduler(threading.Thread):
pipeline.manager.onFilesChangesCompleted(event, build_set)
def _doNodesProvisionedEvent(self, event, pipeline):
request = self.zk_nodepool.getNodeRequest(event.request_id)
request = self.nodepool.zk_nodepool.getNodeRequest(event.request_id)
if not request:
self.log.warning("Unable to find request %s while processing"

View File

@ -420,15 +420,17 @@ class ZooKeeperNodepool(ZooKeeperBase):
return sorted(requests)
def getNodeRequest(self, node_request_id):
def getNodeRequest(self, node_request_id, cached=False):
"""
Retrieve a NodeRequest from a given path in ZooKeeper.
:param str node_request_id: The ID of the node request to retrieve.
:param bool cached: Whether to use the cache.
"""
req = self._node_request_cache.get(node_request_id)
if req:
return req
if cached:
req = self._node_request_cache.get(node_request_id)
if req:
return req
path = f"{self.REQUEST_ROOT}/{node_request_id}"
try: