Use a TreeCache for job request queues

To reduce ZK traffic and contention in sites with many executors
and jobs, we move the job request queue to a TreeCache.  This is
a structure we have been using successfully for some time in
nodepool, and we added it to Zuul for the nodepool-in-zuul work.

This uses a single watch for all items in the job request queue.
It will keep all the requests in the local cache of the queue
up-to-date.  We still do not cache the parameters and other data;
we only load those when needed.

A later change will take advantage of the fact that we can detect
whether a request is locked or not without locking it ourselves
to further reduce traffic and contention.  This change attempts
to minimize changes other than what is necessary for the tree
cache.

Change-Id: Ie3d62dadd9990c847df8302d4da801a990394d2c
This commit is contained in:
James E. Blair
2024-12-05 16:16:50 -08:00
parent 8bf79f0360
commit 317e42a2e5
13 changed files with 244 additions and 273 deletions

View File

@ -865,7 +865,7 @@ class TestingMergerApi(HoldableMergerApi):
# layer.
all_merge_requests = []
for merge_uuid in self._getAllRequestIds():
merge_request = self.get("/".join(
merge_request = self._get("/".join(
[self.REQUEST_ROOT, merge_uuid]))
if merge_request and (not states or merge_request.state in states):
all_merge_requests.append(merge_request)
@ -936,7 +936,7 @@ class TestingExecutorApi(HoldableExecutorApi):
for zone in self._getAllZones():
queue = self.zone_queues[zone]
for build_uuid in queue._getAllRequestIds():
build = queue.get(f'{queue.REQUEST_ROOT}/{build_uuid}')
build = queue._get(f'{queue.REQUEST_ROOT}/{build_uuid}')
if build and (not states or build.state in states):
all_builds.append(build)

View File

@ -16,7 +16,12 @@ import configparser
from zuul.lib.fingergw import FingerGateway
from zuul.zk.components import BaseComponent, ComponentRegistry
from tests.base import iterate_timeout, ZuulTestCase, ZuulWebFixture
from tests.base import (
iterate_timeout,
okay_tracebacks,
ZuulTestCase,
ZuulWebFixture,
)
class TestComponentRegistry(ZuulTestCase):
@ -56,6 +61,8 @@ class TestComponentRegistry(ZuulTestCase):
def test_scheduler_component(self):
self.assertComponentState("scheduler", BaseComponent.RUNNING)
@okay_tracebacks('_start',
'_playbackWorker')
def test_executor_component(self):
self.assertComponentState("executor", BaseComponent.RUNNING)
@ -71,6 +78,8 @@ class TestComponentRegistry(ZuulTestCase):
self.executor_server.register_work()
self.assertComponentAttr("executor", "accepting_work", True)
# This can cause tracebacks in the logs when the tree cache
# attempts to restart.
self.executor_server.zk_client.client.stop()
self.assertComponentStopped("executor")

View File

@ -897,6 +897,15 @@ class TestMerger(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
def getRequest(self, api, request_uuid, state=None):
for _ in iterate_timeout(30, "cache to update"):
req = api.getRequest(request_uuid)
if req:
if state is None:
return req
if req.state == state:
return req
@staticmethod
def _item_from_fake_change(fake_change):
return dict(
@ -1218,22 +1227,11 @@ class TestMerger(ZuulTestCase):
pipeline_name='check',
event_id='1',
), payload)
b = merger_api.get(f"{merger_api.REQUEST_ROOT}/B")
b = self.getRequest(merger_api, "B")
b.state = MergeRequest.RUNNING
merger_api.update(b)
# Wait until the latest state transition is reflected in the Merger
# APIs cache. Using a DataWatch for this purpose could lead to race
# conditions depending on which DataWatch is executed first. The
# DataWatch might be triggered for the correct event, but the cache
# might still be outdated as the DataWatch that updates the cache
# itself wasn't triggered yet.
cache = merger_api._cached_requests
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (cache and cache[b.path].state == MergeRequest.RUNNING):
break
self.getRequest(merger_api, b.uuid, MergeRequest.RUNNING)
# The lost_merges method should only return merges which are running
# but not locked by any merger, in this case merge b
@ -1246,7 +1244,7 @@ class TestMerger(ZuulTestCase):
self.log.debug("Removing lost merge requests")
merger_client.cleanupLostMergeRequests()
cache = merger_api._cached_requests
cache = merger_api.cache._cached_objects
for _ in iterate_timeout(30, "cache to be empty"):
if not cache:
break

View File

@ -101,6 +101,15 @@ class ZooKeeperBaseTestCase(BaseTestCase):
# registry in these tests, so we do it ourselves.
COMPONENT_REGISTRY.create(self.zk_client)
def getRequest(self, api, request_uuid, state=None):
for _ in iterate_timeout(30, "cache to update"):
req = api.getRequest(request_uuid)
if req:
if state is None:
return req
if req.state == state:
return req
class TestZookeeperClient(ZooKeeperBaseTestCase):
@ -543,7 +552,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
self.assertTrue(server.lock(a, blocking=False))
a.state = BuildRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, BuildRequest.RUNNING)
self.getRequest(client, a.uuid, BuildRequest.RUNNING)
# Executor should see no pending requests
reqs = list(server.next())
@ -552,11 +561,11 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
# Executor pauses build
a.state = BuildRequest.PAUSED
server.update(a)
self.assertEqual(client.get(a.path).state, BuildRequest.PAUSED)
self.getRequest(client, a.uuid, BuildRequest.PAUSED)
# Scheduler resumes build
self.assertTrue(event_queue.empty())
sched_a = client.get(a.path)
sched_a = self.getRequest(client, a.uuid)
client.requestResume(sched_a)
(build_request, event) = event_queue.get(timeout=30)
self.assertEqual(build_request, a)
@ -566,11 +575,11 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
a.state = BuildRequest.RUNNING
server.update(a)
server.fulfillResume(a)
self.assertEqual(client.get(a.path).state, BuildRequest.RUNNING)
self.getRequest(client, a.uuid, BuildRequest.RUNNING)
# Scheduler cancels build
self.assertTrue(event_queue.empty())
sched_a = client.get(a.path)
sched_a = self.getRequest(client, a.uuid)
client.requestCancel(sched_a)
(build_request, event) = event_queue.get(timeout=30)
self.assertEqual(build_request, a)
@ -581,7 +590,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
server.update(a)
server.fulfillCancel(a)
server.unlock(a)
self.assertEqual(client.get(a.path).state, BuildRequest.COMPLETED)
self.getRequest(client, a.uuid, BuildRequest.COMPLETED)
# Scheduler removes build request on completion
client.remove(sched_a)
@ -594,7 +603,6 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
'/zuul/executor/unzoned/result-data',
'/zuul/executor/unzoned/results',
'/zuul/executor/unzoned/waiters']))
self.assertEqual(self.getZKWatches(), {})
def test_build_request_remove(self):
# Test the scheduler forcibly removing a request (perhaps the
@ -631,7 +639,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
self.assertTrue(server.lock(a, blocking=False))
a.state = BuildRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, BuildRequest.RUNNING)
self.getRequest(client, a.uuid, MergeRequest.RUNNING)
# Executor should see no pending requests
reqs = list(server.next())
@ -639,7 +647,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
self.assertTrue(event_queue.empty())
# Scheduler rudely removes build request
sched_a = client.get(a.path)
sched_a = self.getRequest(client, a.uuid)
client.remove(sched_a)
# Make sure it shows up as deleted
@ -680,8 +688,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
self.assertEqual(len(reqs), 0)
# Test releases hold
a = client.get(request.path)
self.assertEqual(a.uuid, 'A')
a = self.getRequest(client, request.uuid)
a.state = BuildRequest.REQUESTED
client.update(a)
@ -711,14 +718,14 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
request = BuildRequest(
"A", None, None, "job", "job_uuid", "tenant", "pipeline", '1')
client.submit(request, {})
sched_a = client.get(request.path)
sched_a = self.getRequest(client, request.uuid)
# Simulate the server side
server = ExecutorApi(self.zk_client,
build_request_callback=rq_put,
build_event_callback=eq_put)
exec_a = server.get(request.path)
exec_a = self.getRequest(client, request.uuid)
client.remove(sched_a)
# Try to lock a request that was just removed
@ -746,12 +753,12 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
request_b = BuildRequest(
"B", None, None, "job", "job_uuid", "tenant", "pipeline", '2')
client.submit(request_b, {})
sched_b = client.get(request_b.path)
sched_b = self.getRequest(client, request_b.uuid)
request_c = BuildRequest(
"C", None, None, "job", "job_uuid", "tenant", "pipeline", '3')
client.submit(request_c, {})
sched_c = client.get(request_c.path)
sched_c = self.getRequest(client, request_c.uuid)
# Simulate the server side
server = ExecutorApi(self.zk_client,
@ -768,7 +775,8 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
client.update(sched_b)
client.remove(sched_c)
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (len(server.zone_queues[None]._cached_requests) == 2):
if (len(server.zone_queues[None].cache._cached_objects
) == 2):
break
# Make sure we only got the first request
self.assertEqual(count, 1)
@ -785,27 +793,28 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
br = BuildRequest(
"B", None, None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
path_b = br.path
b_uuid = br.uuid
br = BuildRequest(
"C", "zone", None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
path_c = br.path
c_uuid = br.uuid
br = BuildRequest(
"D", "zone", None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
path_d = br.path
d_uuid = br.uuid
br = BuildRequest(
"E", "zone", None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
path_e = br.path
e_uuid = br.uuid
b = executor_api.get(path_b)
c = executor_api.get(path_c)
d = executor_api.get(path_d)
e = executor_api.get(path_e)
executor_api.waitForSync()
b = self.getRequest(executor_api, b_uuid)
c = self.getRequest(executor_api, c_uuid)
d = self.getRequest(executor_api, d_uuid)
e = self.getRequest(executor_api, e_uuid)
# Make sure the get() method used the correct zone keys
self.assertEqual(set(executor_api.zone_queues.keys()), {"zone", None})
@ -823,19 +832,6 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
e.state = BuildRequest.PAUSED
executor_api.update(e)
# Wait until the latest state transition is reflected in the Executor
# APIs cache. Using a DataWatch for this purpose could lead to race
# conditions depending on which DataWatch is executed first. The
# DataWatch might be triggered for the correct event, but the cache
# might still be outdated as the DataWatch that updates the cache
# itself wasn't triggered yet.
b_cache = executor_api.zone_queues[None]._cached_requests
e_cache = executor_api.zone_queues['zone']._cached_requests
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (b_cache[path_b].state == BuildRequest.RUNNING and
e_cache[path_e].state == BuildRequest.PAUSED):
break
# The lost_builds method should only return builds which are running or
# paused, but not locked by any executor, in this case build b and e.
lost_build_requests = list(executor_api.lostRequests())
@ -913,7 +909,6 @@ class TestMergerApi(ZooKeeperBaseTestCase):
self.assertEqual(self.getZKPaths(client.RESULT_DATA_ROOT), [])
self.assertEqual(self.getZKPaths(client.WAITER_ROOT), [])
self.assertEqual(self.getZKPaths(client.LOCK_ROOT), [])
self.assertEqual(self.getZKWatches(), {})
def test_merge_request(self):
# Test the lifecycle of a merge request
@ -957,7 +952,7 @@ class TestMergerApi(ZooKeeperBaseTestCase):
self.assertTrue(server.lock(a, blocking=False))
a.state = MergeRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, MergeRequest.RUNNING)
self.getRequest(client, a.uuid, MergeRequest.RUNNING)
# Merger should see no pending requests
reqs = list(server.next())
@ -1001,7 +996,7 @@ class TestMergerApi(ZooKeeperBaseTestCase):
# Test releases hold
# We have to get a new merge_request object to update it.
a = client.get(f"{client.REQUEST_ROOT}/A")
a = self.getRequest(client, 'A')
self.assertEqual(a.uuid, 'A')
a.state = MergeRequest.REQUESTED
client.update(a)
@ -1053,7 +1048,7 @@ class TestMergerApi(ZooKeeperBaseTestCase):
self.assertTrue(server.lock(a, blocking=False))
a.state = MergeRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, MergeRequest.RUNNING)
self.getRequest(client, a.uuid, MergeRequest.RUNNING)
# Merger reports result
result_data = {'result': 'ok'}
@ -1148,7 +1143,7 @@ class TestMergerApi(ZooKeeperBaseTestCase):
self.assertTrue(server.lock(a, blocking=False))
a.state = MergeRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, MergeRequest.RUNNING)
a = self.getRequest(client, a.uuid, MergeRequest.RUNNING)
# Merger reports result
result_data = {'result': 'ok'}
@ -1193,7 +1188,7 @@ class TestMergerApi(ZooKeeperBaseTestCase):
pipeline_name='check',
event_id='1',
), payload)
client_a = client.get(f"{client.REQUEST_ROOT}/A")
client_a = self.getRequest(client, 'A')
# Simulate the server side
server = MergerApi(self.zk_client,
@ -1239,7 +1234,7 @@ class TestMergerApi(ZooKeeperBaseTestCase):
pipeline_name='check',
event_id='2',
), payload)
client_b = client.get(f"{client.REQUEST_ROOT}/B")
client_b = self.getRequest(client, 'B')
client.submit(MergeRequest(
uuid='C',
@ -1249,7 +1244,7 @@ class TestMergerApi(ZooKeeperBaseTestCase):
pipeline_name='check',
event_id='2',
), payload)
client_c = client.get(f"{client.REQUEST_ROOT}/C")
client_c = self.getRequest(client, 'C')
# Simulate the server side
server = MergerApi(self.zk_client,
@ -1264,9 +1259,6 @@ class TestMergerApi(ZooKeeperBaseTestCase):
client_b.state = client_b.RUNNING
client.update(client_b)
client.remove(client_c)
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (len(server._cached_requests) == 2):
break
# Make sure we only got the first request
self.assertEqual(count, 1)
@ -1318,9 +1310,9 @@ class TestMergerApi(ZooKeeperBaseTestCase):
event_id='1',
), payload)
b = merger_api.get(f"{merger_api.REQUEST_ROOT}/B")
c = merger_api.get(f"{merger_api.REQUEST_ROOT}/C")
d = merger_api.get(f"{merger_api.REQUEST_ROOT}/D")
b = self.getRequest(merger_api, 'B')
c = self.getRequest(merger_api, 'C')
d = self.getRequest(merger_api, 'D')
b.state = MergeRequest.RUNNING
merger_api.update(b)
@ -1332,18 +1324,6 @@ class TestMergerApi(ZooKeeperBaseTestCase):
d.state = MergeRequest.COMPLETED
merger_api.update(d)
# Wait until the latest state transition is reflected in the Merger
# APIs cache. Using a DataWatch for this purpose could lead to race
# conditions depending on which DataWatch is executed first. The
# DataWatch might be triggered for the correct event, but the cache
# might still be outdated as the DataWatch that updates the cache
# itself wasn't triggered yet.
cache = merger_api._cached_requests
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (cache[b.path].state == MergeRequest.RUNNING and
cache[c.path].state == MergeRequest.RUNNING):
break
# The lost_merges method should only return merges which are running
# but not locked by any merger, in this case merge b
lost_merge_requests = list(merger_api.lostRequests())

View File

@ -49,6 +49,7 @@ class ExecutorClient(object):
def stop(self):
self.log.debug("Stopping")
self.executor_api.stop()
def execute(self, job, nodes, item, pipeline, executor_zone,
dependent_changes=[], merger_items=[]):

View File

@ -3999,6 +3999,7 @@ class ExecutorServer(BaseMergeServer):
self.stopRepl()
self.monitoring_server.stop()
self.tracing.stop()
self.executor_api.stop()
self.log.debug("Stopped executor")
def join(self):

View File

@ -186,13 +186,7 @@ def getJobLogStreamAddress(executor_api, uuid, source_zone,
was found we use the worker information to build the log stream
address.
"""
# Search for the build request in ZooKeeper. This iterates over all
# available zones (inlcuding unzoned) and stops when the UUID is
# found.
# TODO (felix): Remove the zk_worker_zone return value after a deprecation
# period. This is kept for backwards-compatibility until all executors
# store their zone information in the worker_info dictionary.
build_request, zk_worker_zone = executor_api.getByUuid(uuid)
build_request = executor_api.getRequest(uuid)
if build_request is None:
raise StreamingError("Build not found")
@ -206,7 +200,7 @@ def getJobLogStreamAddress(executor_api, uuid, source_zone,
if not worker_info:
raise StreamingError("Build did not start yet")
worker_zone = worker_info.get("zone", zk_worker_zone)
worker_zone = worker_info.get("zone")
job_log_stream_address = {}
if worker_zone and source_zone != worker_zone:
info = _getFingerGatewayInZone(worker_zone)

View File

@ -4833,6 +4833,8 @@ class JobRequest:
self.path = None
self._zstat = None
self.lock = None
self.is_locked = False
self.thread_lock = threading.Lock()
def toDict(self):
return {

View File

@ -443,6 +443,7 @@ class Scheduler(threading.Thread):
self.zk_client.disconnect()
self.log.debug("Stopping tracing")
self.tracing.stop()
self.executor.stop()
if self.statsd:
self.statsd.close()

View File

@ -300,7 +300,6 @@ class ZuulTreeCache(abc.ABC):
obj = None
if data:
# Perform an in-place update of the cached object if possible
obj = self._cached_objects.get(key)
if obj:
# Don't update to older data

View File

@ -41,17 +41,8 @@ class ExecutorQueue(JobRequestQueue):
return self._initial_state_getter()
def lostRequests(self):
# Get a list of requests which are running but not locked by
# any client.
for request in self.inState(self.request_class.RUNNING,
self.request_class.PAUSED):
try:
if self.isLocked(request):
continue
yield request
except NoNodeError:
# Request disappeared
pass
return super().lostRequests(self.request_class.RUNNING,
self.request_class.PAUSED)
class ExecutorApi:
@ -86,6 +77,10 @@ class ExecutorApi:
# For the side effect of creating a queue
self.zone_queues[zone]
def stop(self):
for queue in self.zone_queues.values():
queue.cache.stop()
def _getInitialState(self):
return BuildRequest.REQUESTED
@ -128,7 +123,7 @@ class ExecutorApi:
def next(self):
for request in self.inState(BuildRequest.REQUESTED):
for queue in self.zone_queues.values():
request2 = queue._cached_requests.get(request.path)
request2 = queue.getRequest(request.uuid)
if (request2 and
request2.state == BuildRequest.REQUESTED):
yield request2
@ -150,25 +145,23 @@ class ExecutorApi:
if path.startswith(self.zones_root):
# Remove zone root so we end up with: <zone>/requests/<uuid>
rel_path = path[len(f"{self.zones_root}/"):]
zone = rel_path.split("/")[0]
zone, _, request_uuid = rel_path.split("/")
else:
rel_path = path[len(f"{self.unzoned_root}/"):]
zone = None
return self.zone_queues[zone].get(path)
_, request_uuid = rel_path.split("/")
return self.zone_queues[zone].getRequest(request_uuid)
def getByUuid(self, uuid):
def getRequest(self, uuid):
"""Find a build request by its UUID.
This method will search for the UUID in all available zones.
"""
for zone in self._getAllZones():
request = self.zone_queues[zone].getByUuid(uuid)
request = self.zone_queues[zone].getRequest(uuid)
if request:
# TODO (felix): Remove the zone return value after a
# deprecation period. This is kept for backwards compatibility
# until all executors store their zone information in the
# worker_info dictionary on the BuildRequest.
return request, zone
return None, None
return request
return None
def remove(self, request):
return self.zone_queues[request.zone].remove(request)
@ -213,3 +206,7 @@ class ExecutorApi:
for queue in self.zone_queues.values():
ret.extend(queue._getAllRequestIds())
return ret
def waitForSync(self):
for queue in self.zone_queues.values():
queue.waitForSync()

View File

@ -27,11 +27,11 @@ from kazoo.client import TransactionRequest
from zuul.lib.jsonutil import json_dumps
from zuul.lib.logutil import get_annotated_logger
from zuul.model import JobRequest
from zuul.zk import ZooKeeperSimpleBase, sharding
from zuul.zk import sharding
from zuul.zk.event_queues import JobResultFuture
from zuul.zk.exceptions import JobRequestNotFound
from zuul.zk.vendor.watchers import ExistingDataWatch
from zuul.zk.locks import SessionAwareLock
from zuul.zk.cache import ZuulTreeCache
class JobRequestEvent(Enum):
@ -96,15 +96,122 @@ class RequestUpdater:
)
class JobRequestQueue(ZooKeeperSimpleBase):
class JobRequestCache(ZuulTreeCache):
def __init__(self, client, root, request_class,
request_callback=None,
event_callback=None):
self.request_class = request_class
self.request_callback = request_callback
self.event_callback = event_callback
super().__init__(client, root)
def _parsePath(self, path):
if not path.startswith(self.root):
return None
path = path[len(self.root) + 1:]
parts = path.split('/')
# We are interested in requests with a parts that look like:
# ([<self.items_path>, <self.locks_path>], <uuid>, ...)
if len(parts) < 2:
return None
return parts
def parsePath(self, path):
parts = self._parsePath(path)
if parts is None:
return None
if len(parts) != 2:
return None
if parts[0] == 'requests':
return (parts[0], parts[1])
def preCacheHook(self, event, exists, stat=None):
parts = self._parsePath(event.path)
if parts is None:
return
if parts[0] == 'requests' and self.event_callback:
if event.type == EventType.CREATED:
if len(parts) == 3:
job_event = None
if parts[2] == 'cancel':
job_event = JobRequestEvent.CANCELED
elif parts[2] == 'resume':
job_event = JobRequestEvent.RESUMED
if job_event:
request = self.getRequest(parts[1])
if not request:
return
self.event_callback(request, job_event)
elif event.type == EventType.DELETED:
if len(parts) == 2:
request = self.getRequest(parts[1])
if not request:
return
self.event_callback(request, JobRequestEvent.DELETED)
elif parts[0] == 'locks':
request = self.getRequest(parts[1])
if not request:
return
request.is_locked = exists
def objectFromRaw(self, key, data, stat):
if key[0] == 'requests':
content = self._bytesToDict(data)
request = self.request_class.fromDict(content)
request._zstat = stat
request.path = "/".join([self.root, 'requests', request.uuid])
request._old_state = request.state
return request
def updateFromRaw(self, request, key, data, stat):
content = self._bytesToDict(data)
with request.thread_lock:
# TODO: move thread locking into the TreeCache so we don't
# duplicate this check.
if stat.mzxid >= request._zstat.mzxid:
request.updateFromDict(content)
request._zstat = stat
def postCacheHook(self, event, data, stat, key, obj):
if key[0] == 'requests' and self.request_callback:
if event.type in (EventType.CREATED, EventType.NONE):
self.request_callback()
# This is a test-specific condition: For test cases which
# are using hold_*_jobs_in_queue the state change on the
# request from HOLD to REQUESTED is done outside of the
# server. Thus, we must also set the wake event (the
# callback) so the servercan pick up those jobs after they
# are released. To not cause a thundering herd problem in
# production for each cache update, the callback is only
# called under this very specific condition that can only
# occur in the tests.
elif (
self.request_callback
and obj
and obj._old_state == self.request_class.HOLD
and obj.state == self.request_class.REQUESTED
):
obj._old_state = obj.state
self.request_callback()
def getRequest(self, request_uuid):
key = ('requests', request_uuid)
return self._cached_objects.get(key)
def getRequests(self):
return self._cached_objects.values()
class JobRequestQueue:
log = logging.getLogger("zuul.JobRequestQueue")
request_class = JobRequest
def __init__(self, client, root, use_cache=True,
request_callback=None, event_callback=None):
super().__init__(client)
self.use_cache = use_cache
self.zk_client = client
self.kazoo_client = client.client
self.REQUEST_ROOT = f"{root}/requests"
self.LOCK_ROOT = f"{root}/locks"
@ -113,12 +220,6 @@ class JobRequestQueue(ZooKeeperSimpleBase):
self.RESULT_DATA_ROOT = f"{root}/result-data"
self.WAITER_ROOT = f"{root}/waiters"
self.request_callback = request_callback
self.event_callback = event_callback
# path -> request
self._cached_requests = {}
self.kazoo_client.ensure_path(self.REQUEST_ROOT)
self.kazoo_client.ensure_path(self.PARAM_ROOT)
self.kazoo_client.ensure_path(self.RESULT_ROOT)
@ -126,108 +227,22 @@ class JobRequestQueue(ZooKeeperSimpleBase):
self.kazoo_client.ensure_path(self.WAITER_ROOT)
self.kazoo_client.ensure_path(self.LOCK_ROOT)
self.register()
if use_cache:
self.cache = JobRequestCache(
client, root, self.request_class,
request_callback, event_callback)
else:
self.cache = None
# So far only used by tests
def waitForSync(self):
self.cache.waitForSync()
@property
def initial_state(self):
# This supports holding requests in tests
return self.request_class.REQUESTED
def register(self):
if self.use_cache:
# Register a child watch that listens for new requests
self.kazoo_client.ChildrenWatch(
self.REQUEST_ROOT,
self._makeRequestWatcher(self.REQUEST_ROOT),
send_event=True,
)
def _makeRequestWatcher(self, path):
def watch(requests, event=None):
return self._watchRequests(path, requests)
return watch
def _watchRequests(self, path, requests):
# The requests list always contains all active children. Thus,
# we first have to find the new ones by calculating the delta
# between the requests list and our current cache entries.
# NOTE (felix): We could also use this list to determine the
# deleted requests, but it's easier to do this in the
# DataWatch for the single request instead. Otherwise we have
# to deal with race conditions between the children and the
# data watch as one watch might update a cache entry while the
# other tries to remove it.
request_paths = {
f"{path}/{uuid}" for uuid in requests
}
new_requests = request_paths - set(
self._cached_requests.keys()
)
for req_path in new_requests:
ExistingDataWatch(self.kazoo_client,
req_path,
self._makeStateWatcher(req_path))
# Notify the user about new requests if a callback is provided.
# When we register the data watch, we will receive an initial
# callback immediately. The list of children may be empty in
# that case, so we should not fire our callback since there
# are no requests to handle.
if new_requests and self.request_callback:
self.request_callback()
def _makeStateWatcher(self, path):
def watch(data, stat, event=None):
return self._watchState(path, data, stat, event)
return watch
def _watchState(self, path, data, stat, event=None):
if (not event or event.type == EventType.CHANGED) and data is not None:
# As we already get the data and the stat value, we can directly
# use it without asking ZooKeeper for the data again.
content = self._bytesToDict(data)
if not content:
return
# We need this one for the HOLD -> REQUESTED check further down
old_request = self._cached_requests.get(path)
request = self.request_class.fromDict(content)
request.path = path
request._zstat = stat
self._cached_requests[path] = request
# NOTE (felix): This is a test-specific condition: For test cases
# which are using hold_*_jobs_in_queue the state change on the
# request from HOLD to REQUESTED is done outside of the server.
# Thus, we must also set the wake event (the callback) so the
# servercan pick up those jobs after they are released. To not
# cause a thundering herd problem in production for each cache
# update, the callback is only called under this very specific
# condition that can only occur in the tests.
if (
self.request_callback
and old_request
and old_request.state == self.request_class.HOLD
and request.state == self.request_class.REQUESTED
):
self.request_callback()
elif ((event and event.type == EventType.DELETED) or data is None):
request = self._cached_requests.get(path)
with suppress(KeyError):
del self._cached_requests[path]
if request and self.event_callback:
self.event_callback(request, JobRequestEvent.DELETED)
# Return False to stop the datawatch as the build got deleted.
return False
def inState(self, *states):
if not states:
# If no states are provided, build a tuple containing all available
@ -236,7 +251,7 @@ class JobRequestQueue(ZooKeeperSimpleBase):
states = self.request_class.ALL_STATES
requests = [
req for req in list(self._cached_requests.values())
req for req in list(self.cache.getRequests())
if req.state in states
]
@ -247,7 +262,7 @@ class JobRequestQueue(ZooKeeperSimpleBase):
def next(self):
for request in self.inState(self.request_class.REQUESTED):
request = self._cached_requests.get(request.path)
request = self.cache.getRequest(request.uuid)
if (request and
request.state == self.request_class.REQUESTED):
yield request
@ -277,7 +292,7 @@ class JobRequestQueue(ZooKeeperSimpleBase):
[self.WAITER_ROOT, request.uuid]
)
self.kazoo_client.create(waiter_path, ephemeral=True)
result = JobResultFuture(self.client, request.path,
result = JobResultFuture(self.zk_client, request.path,
result_path, waiter_path)
request.result_path = result_path
@ -322,15 +337,14 @@ class JobRequestQueue(ZooKeeperSimpleBase):
self.kazoo_client.create(request.result_path,
self._dictToBytes(data))
def get(self, path):
"""Get a request
def getRequest(self, request_uuid):
if self.cache:
return self.cache.getRequest(request_uuid)
else:
path = "/".join([self.REQUEST_ROOT, request_uuid])
return self._get(path)
Note: do not mix get with iteration; iteration returns cached
requests while get returns a newly created object each
time. If you lock a request, you must use the same object to
unlock it.
"""
def _get(self, path):
try:
data, zstat = self.kazoo_client.get(path)
except NoNodeError:
@ -340,18 +354,11 @@ class JobRequestQueue(ZooKeeperSimpleBase):
return None
content = self._bytesToDict(data)
request = self.request_class.fromDict(content)
request.path = path
request._zstat = zstat
return request
def getByUuid(self, uuid):
"""Get a request by its UUID without using the cache."""
path = f"{self.REQUEST_ROOT}/{uuid}"
return self.get(path)
def refresh(self, request):
"""Refreshs a request object with the current data from ZooKeeper. """
try:
@ -366,8 +373,10 @@ class JobRequestQueue(ZooKeeperSimpleBase):
content = self._bytesToDict(data)
request.updateFromDict(content)
request._zstat = zstat
with request.thread_lock:
if zstat.mzxid >= request._zstat.mzxid:
request.updateFromDict(content)
request._zstat = zstat
def remove(self, request):
log = get_annotated_logger(self.log, request.event_id)
@ -397,20 +406,6 @@ class JobRequestQueue(ZooKeeperSimpleBase):
def fulfillCancel(self, request):
self.kazoo_client.delete(f"{request.path}/cancel")
def _watchEvents(self, actions, event=None):
if event is None:
return
job_event = None
if "cancel" in actions:
job_event = JobRequestEvent.CANCELED
elif "resume" in actions:
job_event = JobRequestEvent.RESUMED
if job_event:
request = self._cached_requests.get(event.path)
self.event_callback(request, job_event)
def lock(self, request, blocking=True, timeout=None):
path = "/".join([self.LOCK_ROOT, request.uuid])
have_lock = False
@ -444,13 +439,6 @@ class JobRequestQueue(ZooKeeperSimpleBase):
return False
request.lock = lock
# Create the children watch to listen for cancel/resume actions on this
# build request.
if self.event_callback:
self.kazoo_client.ChildrenWatch(
request.path, self._watchEvents, send_event=True)
return True
def _releaseLock(self, request, lock):
@ -498,24 +486,28 @@ class JobRequestQueue(ZooKeeperSimpleBase):
is_locked = len(lock.contenders()) > 0
return is_locked
def lostRequests(self):
def lostRequests(self, *states):
# Get a list of requests which are running but not locked by
# any client.
for req in self.inState(self.request_class.RUNNING):
if not states:
states = (self.request_class.RUNNING,)
for req in self.inState(*states):
try:
if self.isLocked(req):
continue
# It may have completed in the interim, so double
# check that.
if req.state not in states:
continue
# We may be racing a cache update, so manually
# refresh and check again.
self.refresh(req)
if req.state not in states:
continue
except NoNodeError:
# Request was removed in the meantime
continue
# Double check that our cache isn't out of date: it should
# still exist and be running.
oldreq = req
req = self.get(oldreq.path)
if req is None:
self._deleteLock(oldreq.uuid)
elif req.state == self.request_class.RUNNING:
yield req
yield req
def _getAllRequestIds(self):
# Get a list of all request ids without using the cache.

View File

@ -100,9 +100,6 @@ class LockableZKObjectCache(ZuulTreeCache):
def updateFromRaw(self, obj, key, data, zstat):
obj._updateFromRaw(data, zstat, None)
def objectFromDict(self, d, zstat):
return self.zkobject_class._fromRaw(d, zstat, None)
def getItem(self, item_id):
self.ensureReady()
return self._cached_objects.get((item_id,))