diff --git a/tests/base.py b/tests/base.py index 602557b887..b62df684fc 100644 --- a/tests/base.py +++ b/tests/base.py @@ -3165,14 +3165,11 @@ class HoldableMergerApi(MergerApi): self.hold_in_queue = False self.history = {} - def submit(self, uuid, job_type, build_set_uuid, tenant_name, - pipeline_name, params, event_id, precedence=0, - needs_result=False): - self.log.debug("Appending merge job to history: %s", uuid) - self.history[uuid] = FakeMergeRequest(uuid, job_type, params) - return super().submit( - uuid, job_type, build_set_uuid, tenant_name, pipeline_name, params, - event_id, precedence, needs_result) + def submit(self, request, params, needs_result=False): + self.log.debug("Appending merge job to history: %s", request.uuid) + self.history[request.uuid] = FakeMergeRequest( + request.uuid, request.job_type, params) + return super().submit(request, params, needs_result) @property def initial_state(self): @@ -3190,15 +3187,13 @@ class TestingMergerApi(HoldableMergerApi): # the merge requests directly from ZooKeeper and not from a cache # layer. all_merge_requests = [] - for merge_uuid in self.kazoo_client.get_children( - self.MERGE_REQUEST_ROOT - ): + for merge_uuid in self._getAllRequestIds(): merge_request = self.get("/".join( - [self.MERGE_REQUEST_ROOT, merge_uuid])) + [self.REQUEST_ROOT, merge_uuid])) if merge_request and (not states or merge_request.state in states): all_merge_requests.append(merge_request) - return all_merge_requests + return sorted(all_merge_requests) def release(self, merge_request=None): """ @@ -3251,11 +3246,10 @@ class RecordingMergeClient(zuul.merger.client.MergeClient): class HoldableExecutorApi(ExecutorApi): def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) self.hold_in_queue = False + super().__init__(*args, **kwargs) - @property - def initial_state(self): + def _getInitialState(self): if self.hold_in_queue: return BuildRequest.HOLD return BuildRequest.REQUESTED @@ -3268,16 +3262,12 @@ class TestingExecutorApi(HoldableExecutorApi): # As this method is used for assertions in the tests, it # should look up the build requests directly from ZooKeeper # and not from a cache layer. - if self.zone_filter: - zones = self.zone_filter - else: - zones = self._getAllZones() all_builds = [] - for zone in zones: - zone_path = self._getZoneRoot(zone) - for build_uuid in self._getAllBuildIds([zone]): - build = self.get("/".join([zone_path, build_uuid])) + for zone in self._getAllZones(): + queue = self.zone_queues[zone] + for build_uuid in queue._getAllRequestIds(): + build = queue.get(f'{queue.REQUEST_ROOT}/{build_uuid}') if build and (not states or build.state in states): all_builds.append(build) @@ -3293,7 +3283,7 @@ class TestingExecutorApi(HoldableExecutorApi): self._test_build_request_job_map = {} if build_request.uuid in self._test_build_request_job_map: return self._test_build_request_job_map[build_request.uuid] - d = self.getBuildParams(build_request) + d = self.getParams(build_request) if d: data = d.get('job', '') else: diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py index ee939d9497..628b73ad63 100644 --- a/tests/unit/test_merger_repo.py +++ b/tests/unit/test_merger_repo.py @@ -1025,17 +1025,16 @@ class TestMerger(ZuulTestCase): merger_api = MergerApi(self.zk_client) payload = {'merge': 'test'} - merger_api.submit( + merger_api.submit(MergeRequest( uuid='B', job_type=MergeRequest.MERGE, build_set_uuid='BB', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - ) + ), payload) - b = merger_api.get(f"{merger_api.MERGE_REQUEST_ROOT}/B") + b = merger_api.get(f"{merger_api.REQUEST_ROOT}/B") b.state = MergeRequest.RUNNING merger_api.update(b) @@ -1046,14 +1045,14 @@ class TestMerger(ZuulTestCase): # DataWatch might be triggered for the correct event, but the cache # might still be outdated as the DataWatch that updates the cache # itself wasn't triggered yet. - cache = merger_api._cached_merge_requests + cache = merger_api._cached_requests for _ in iterate_timeout(30, "cache to be up-to-date"): if (cache and cache[b.path].state == MergeRequest.RUNNING): break # The lost_merges method should only return merges which are running # but not locked by any merger, in this case merge b - lost_merge_requests = list(merger_api.lostMergeRequests()) + lost_merge_requests = list(merger_api.lostRequests()) self.assertEqual(1, len(lost_merge_requests)) self.assertEqual(b.path, lost_merge_requests[0].path) @@ -1062,7 +1061,7 @@ class TestMerger(ZuulTestCase): merger_client = self.scheds.first.sched.merger merger_client.cleanupLostMergeRequests() - lost_merge_requests = list(merger_api.lostMergeRequests()) + lost_merge_requests = list(merger_api.lostRequests()) self.assertEqual(0, len(lost_merge_requests)) diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index e63342208c..efc3a61a6a 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -1015,8 +1015,7 @@ class TestScheduler(ZuulTestCase): self.assertEqual(len(self.builds), 0) self.assertEqual(len(queue), 1) self.assertEqual(queue[0].zone, None) - params = self.executor_server.executor_api.getBuildParams( - queue[0]) + params = self.executor_server.executor_api.getParams(queue[0]) self.assertEqual(params['job'], 'project-merge') self.assertEqual(params['items'][0]['number'], '%d' % A.number) @@ -1027,8 +1026,8 @@ class TestScheduler(ZuulTestCase): self.executor_api.release('.*-merge') self.waitUntilSettled() queue = list(self.executor_api.queued()) - params = [self.executor_server.executor_api.getBuildParams( - item) for item in queue] + params = [self.executor_server.executor_api.getParams(item) + for item in queue] self.assertEqual(len(self.builds), 0) self.assertEqual(len(queue), 6) diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index da3ddbaafc..b6e99e9405 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -22,7 +22,8 @@ from zuul.model import BuildRequest, HoldRequest, MergeRequest from zuul.zk import ZooKeeperClient from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache from zuul.zk.exceptions import LockException -from zuul.zk.executor import ExecutorApi, BuildRequestEvent +from zuul.zk.executor import ExecutorApi +from zuul.zk.job_request_queue import JobRequestEvent from zuul.zk.merger import MergerApi from zuul.zk.layout import LayoutStateStore, LayoutState from zuul.zk.locks import locked @@ -386,7 +387,8 @@ class TestExecutorApi(ZooKeeperBaseTestCase): build_event_callback=eq_put) # Scheduler submits request - client.submit("A", "tenant", "pipeline", {'job': 'test'}, None, '1') + request = BuildRequest("A", None, "tenant", "pipeline", '1') + client.submit(request, {'job': 'test'}) request_queue.get(timeout=30) # Executor receives request @@ -394,10 +396,10 @@ class TestExecutorApi(ZooKeeperBaseTestCase): self.assertEqual(len(reqs), 1) a = reqs[0] self.assertEqual(a.uuid, 'A') - params = client.getBuildParams(a) + params = client.getParams(a) self.assertEqual(params, {'job': 'test'}) - client.clearBuildParams(a) - params = client.getBuildParams(a) + client.clearParams(a) + params = client.getParams(a) self.assertIsNone(params) # Executor locks request @@ -421,7 +423,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase): client.requestResume(sched_a) (build_request, event) = event_queue.get(timeout=30) self.assertEqual(build_request, a) - self.assertEqual(event, BuildRequestEvent.RESUMED) + self.assertEqual(event, JobRequestEvent.RESUMED) # Executor resumes build a.state = BuildRequest.RUNNING @@ -435,7 +437,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase): client.requestCancel(sched_a) (build_request, event) = event_queue.get(timeout=30) self.assertEqual(build_request, a) - self.assertEqual(event, BuildRequestEvent.CANCELED) + self.assertEqual(event, JobRequestEvent.CANCELED) # Executor aborts build a.state = BuildRequest.COMPLETED @@ -450,12 +452,14 @@ class TestExecutorApi(ZooKeeperBaseTestCase): # Scheduler removes build request on completion client.remove(sched_a) - self.assertEqual(set(self._get_zk_tree(client.BUILD_REQUEST_ROOT)), - set(['/zuul/build-requests/unzoned', - '/zuul/build-requests/zones'])) - self.assertEqual(self._get_zk_tree( - client.BUILD_REQUEST_ROOT + '/zones'), []) - self.assertEqual(self._get_zk_tree(client.LOCK_ROOT), []) + self.assertEqual(set(self._get_zk_tree('/zuul/executor')), + set(['/zuul/executor/unzoned', + '/zuul/executor/unzoned/locks', + '/zuul/executor/unzoned/params', + '/zuul/executor/unzoned/requests', + '/zuul/executor/unzoned/result-data', + '/zuul/executor/unzoned/results', + '/zuul/executor/unzoned/waiters'])) self.assertEqual(self._get_watches(), {}) def test_build_request_remove(self): @@ -478,7 +482,8 @@ class TestExecutorApi(ZooKeeperBaseTestCase): build_event_callback=eq_put) # Scheduler submits request - client.submit("A", "tenant", "pipeline", {}, None, '1') + request = BuildRequest("A", None, "tenant", "pipeline", '1') + client.submit(request, {}) request_queue.get(timeout=30) # Executor receives request @@ -505,7 +510,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase): # Make sure it shows up as deleted (build_request, event) = event_queue.get(timeout=30) self.assertEqual(build_request, a) - self.assertEqual(event, BuildRequestEvent.DELETED) + self.assertEqual(event, JobRequestEvent.DELETED) # Executor should not write anything else since the request # was deleted. @@ -530,7 +535,8 @@ class TestExecutorApi(ZooKeeperBaseTestCase): build_event_callback=eq_put) # Scheduler submits request - a_path = client.submit("A", "tenant", "pipeline", {}, None, '1') + request = BuildRequest("A", None, "tenant", "pipeline", '1') + client.submit(request, {}) request_queue.get(timeout=30) # Executor receives nothing @@ -538,7 +544,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase): self.assertEqual(len(reqs), 0) # Test releases hold - a = client.get(a_path) + a = client.get(request.path) self.assertEqual(a.uuid, 'A') a.state = BuildRequest.REQUESTED client.update(a) @@ -566,15 +572,16 @@ class TestExecutorApi(ZooKeeperBaseTestCase): client = ExecutorApi(self.zk_client) # Scheduler submits request - a_path = client.submit("A", "tenant", "pipeline", {}, None, '1') - sched_a = client.get(a_path) + request = BuildRequest("A", None, "tenant", "pipeline", '1') + client.submit(request, {}) + sched_a = client.get(request.path) # Simulate the server side server = ExecutorApi(self.zk_client, build_request_callback=rq_put, build_event_callback=eq_put) - exec_a = server.get(a_path) + exec_a = server.get(request.path) client.remove(sched_a) # Try to lock a request that was just removed @@ -585,15 +592,24 @@ class TestExecutorApi(ZooKeeperBaseTestCase): # requests executor_api = ExecutorApi(self.zk_client) - executor_api.submit("A", "tenant", "pipeline", {}, "zone", '1') - path_b = executor_api.submit("B", "tenant", "pipeline", {}, - None, '1') - path_c = executor_api.submit("C", "tenant", "pipeline", {}, - "zone", '1') - path_d = executor_api.submit("D", "tenant", "pipeline", {}, - "zone", '1') - path_e = executor_api.submit("E", "tenant", "pipeline", {}, - "zone", '1') + br = BuildRequest("A", "zone", "tenant", "pipeline", '1') + executor_api.submit(br, {}) + + br = BuildRequest("B", None, "tenant", "pipeline", '1') + executor_api.submit(br, {}) + path_b = br.path + + br = BuildRequest("C", "zone", "tenant", "pipeline", '1') + executor_api.submit(br, {}) + path_c = br.path + + br = BuildRequest("D", "zone", "tenant", "pipeline", '1') + executor_api.submit(br, {}) + path_d = br.path + + br = BuildRequest("E", "zone", "tenant", "pipeline", '1') + executor_api.submit(br, {}) + path_e = br.path b = executor_api.get(path_b) c = executor_api.get(path_c) @@ -619,15 +635,16 @@ class TestExecutorApi(ZooKeeperBaseTestCase): # DataWatch might be triggered for the correct event, but the cache # might still be outdated as the DataWatch that updates the cache # itself wasn't triggered yet. - cache = executor_api._cached_build_requests + b_cache = executor_api.zone_queues[None]._cached_requests + e_cache = executor_api.zone_queues['zone']._cached_requests for _ in iterate_timeout(30, "cache to be up-to-date"): - if (cache[path_b].state == BuildRequest.RUNNING and - cache[path_e].state == BuildRequest.PAUSED): + if (b_cache[path_b].state == BuildRequest.RUNNING and + e_cache[path_e].state == BuildRequest.PAUSED): break # The lost_builds method should only return builds which are running or # paused, but not locked by any executor, in this case build b and e. - lost_build_requests = list(executor_api.lostBuildRequests()) + lost_build_requests = list(executor_api.lostRequests()) self.assertEqual(2, len(lost_build_requests)) self.assertEqual(b.path, lost_build_requests[0].path) @@ -636,17 +653,17 @@ class TestExecutorApi(ZooKeeperBaseTestCase): # Test cleaning up orphaned request parameters executor_api = ExecutorApi(self.zk_client) - path_a = executor_api.submit( - "A", "tenant", "pipeline", {}, "zone", '1') + br = BuildRequest("A", "zone", "tenant", "pipeline", '1') + executor_api.submit(br, {}) - params_root = executor_api.BUILD_PARAMS_ROOT - self.assertEqual(len(executor_api._getAllBuildIds()), 1) + params_root = executor_api.zone_queues['zone'].PARAM_ROOT + self.assertEqual(len(executor_api._getAllRequestIds()), 1) self.assertEqual(len( self.zk_client.client.get_children(params_root)), 1) # Delete the request but not the params - self.zk_client.client.delete(path_a) - self.assertEqual(len(executor_api._getAllBuildIds()), 0) + self.zk_client.client.delete(br.path) + self.assertEqual(len(executor_api._getAllRequestIds()), 0) self.assertEqual(len( self.zk_client.client.get_children(params_root)), 1) @@ -673,7 +690,8 @@ class TestExecutorApi(ZooKeeperBaseTestCase): # Simulate the client side client = ExecutorApi(self.zk_client) - client.submit("A", "tenant", "pipeline", {}, None, '1') + client.submit( + BuildRequest("A", None, "tenant", "pipeline", '1'), {}) # Simulate the server side server = ExecutorApi(self.zk_client, @@ -720,14 +738,13 @@ class TestMergerApi(ZooKeeperBaseTestCase): sessions = None return ret - def _assertEmptyRoots(self): - api = MergerApi - self.assertEqual(self._get_zk_tree(api.MERGE_REQUEST_ROOT), []) - self.assertEqual(self._get_zk_tree(api.MERGE_PARAMS_ROOT), []) - self.assertEqual(self._get_zk_tree(api.MERGE_RESULT_ROOT), []) - self.assertEqual(self._get_zk_tree(api.MERGE_RESULT_DATA_ROOT), []) - self.assertEqual(self._get_zk_tree(api.MERGE_WAITER_ROOT), []) - self.assertEqual(self._get_zk_tree(api.LOCK_ROOT), []) + def _assertEmptyRoots(self, client): + self.assertEqual(self._get_zk_tree(client.REQUEST_ROOT), []) + self.assertEqual(self._get_zk_tree(client.PARAM_ROOT), []) + self.assertEqual(self._get_zk_tree(client.RESULT_ROOT), []) + self.assertEqual(self._get_zk_tree(client.RESULT_DATA_ROOT), []) + self.assertEqual(self._get_zk_tree(client.WAITER_ROOT), []) + self.assertEqual(self._get_zk_tree(client.LOCK_ROOT), []) self.assertEqual(self._get_watches(), {}) def test_merge_request(self): @@ -746,15 +763,15 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Scheduler submits request payload = {'merge': 'test'} - client.submit( + request = MergeRequest( uuid='A', job_type=MergeRequest.MERGE, build_set_uuid='AA', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', ) + client.submit(request, payload) request_queue.get(timeout=30) # Merger receives request @@ -762,10 +779,10 @@ class TestMergerApi(ZooKeeperBaseTestCase): self.assertEqual(len(reqs), 1) a = reqs[0] self.assertEqual(a.uuid, 'A') - params = client.getMergeParams(a) + params = client.getParams(a) self.assertEqual(params, payload) - client.clearMergeParams(a) - params = client.getMergeParams(a) + client.clearParams(a) + params = client.getParams(a) self.assertIsNone(params) # Merger locks request @@ -782,7 +799,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): server.remove(a) server.unlock(a) - self._assertEmptyRoots() + self._assertEmptyRoots(client) def test_merge_request_hold(self): # Test that we can hold a merge request in "queue" @@ -800,15 +817,14 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Scheduler submits request payload = {'merge': 'test'} - client.submit( + client.submit(MergeRequest( uuid='A', job_type=MergeRequest.MERGE, build_set_uuid='AA', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - ) + ), payload) request_queue.get(timeout=30) # Merger receives nothing @@ -817,7 +833,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Test releases hold # We have to get a new merge_request object to update it. - a = client.get(f"{client.MERGE_REQUEST_ROOT}/A") + a = client.get(f"{client.REQUEST_ROOT}/A") self.assertEqual(a.uuid, 'A') a.state = MergeRequest.REQUESTED client.update(a) @@ -831,7 +847,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): server.remove(a) # The rest is redundant. - self._assertEmptyRoots() + self._assertEmptyRoots(client) def test_merge_request_result(self): # Test the lifecycle of a merge request @@ -849,16 +865,14 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Scheduler submits request payload = {'merge': 'test'} - future = client.submit( + future = client.submit(MergeRequest( uuid='A', job_type=MergeRequest.MERGE, build_set_uuid='AA', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - needs_result=True, - ) + ), payload, needs_result=True) request_queue.get(timeout=30) # Merger receives request @@ -877,13 +891,13 @@ class TestMergerApi(ZooKeeperBaseTestCase): result_data = {'result': 'ok'} server.reportResult(a, result_data) - self.assertEqual(set(self._get_zk_tree(client.MERGE_RESULT_ROOT)), - set(['/zuul/merge-results/A'])) - self.assertEqual(set(self._get_zk_tree(client.MERGE_RESULT_DATA_ROOT)), - set(['/zuul/merge-result-data/A', - '/zuul/merge-result-data/A/0000000000'])) - self.assertEqual(self._get_zk_tree(client.MERGE_WAITER_ROOT), - ['/zuul/merge-waiters/A']) + self.assertEqual(set(self._get_zk_tree(client.RESULT_ROOT)), + set(['/zuul/merger/results/A'])) + self.assertEqual(set(self._get_zk_tree(client.RESULT_DATA_ROOT)), + set(['/zuul/merger/result-data/A', + '/zuul/merger/result-data/A/0000000000'])) + self.assertEqual(self._get_zk_tree(client.WAITER_ROOT), + ['/zuul/merger/waiters/A']) # Merger removes and unlocks merge request on completion server.remove(a) @@ -893,7 +907,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): self.assertTrue(future.wait()) self.assertEqual(future.data, result_data) - self._assertEmptyRoots() + self._assertEmptyRoots(client) def test_lost_merge_request_params(self): # Test cleaning up orphaned request parameters @@ -901,18 +915,17 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Scheduler submits request payload = {'merge': 'test'} - merger_api.submit( + merger_api.submit(MergeRequest( uuid='A', job_type=MergeRequest.MERGE, build_set_uuid='AA', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - ) - path_a = '/'.join([merger_api.MERGE_REQUEST_ROOT, 'A']) + ), payload) + path_a = '/'.join([merger_api.REQUEST_ROOT, 'A']) - params_root = merger_api.MERGE_PARAMS_ROOT + params_root = merger_api.PARAM_ROOT self.assertEqual(len(merger_api._getAllRequestIds()), 1) self.assertEqual(len( self.zk_client.client.get_children(params_root)), 1) @@ -928,7 +941,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): self.assertEqual(len( self.zk_client.client.get_children(params_root)), 0) - self._assertEmptyRoots() + self._assertEmptyRoots(merger_api) def test_lost_merge_request_result(self): # Test that we can clean up orphaned merge results @@ -946,16 +959,15 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Scheduler submits request payload = {'merge': 'test'} - future = client.submit( + future = client.submit(MergeRequest( uuid='A', job_type=MergeRequest.MERGE, build_set_uuid='AA', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - needs_result=True, - ) + ), payload, needs_result=True) + request_queue.get(timeout=30) # Merger receives request @@ -978,13 +990,13 @@ class TestMergerApi(ZooKeeperBaseTestCase): server.remove(a) server.unlock(a) - self.assertEqual(set(self._get_zk_tree(client.MERGE_RESULT_ROOT)), - set(['/zuul/merge-results/A'])) - self.assertEqual(set(self._get_zk_tree(client.MERGE_RESULT_DATA_ROOT)), - set(['/zuul/merge-result-data/A', - '/zuul/merge-result-data/A/0000000000'])) - self.assertEqual(self._get_zk_tree(client.MERGE_WAITER_ROOT), - ['/zuul/merge-waiters/A']) + self.assertEqual(set(self._get_zk_tree(client.RESULT_ROOT)), + set(['/zuul/merger/results/A'])) + self.assertEqual(set(self._get_zk_tree(client.RESULT_DATA_ROOT)), + set(['/zuul/merger/result-data/A', + '/zuul/merger/result-data/A/0000000000'])) + self.assertEqual(self._get_zk_tree(client.WAITER_ROOT), + ['/zuul/merger/waiters/A']) # Scheduler "disconnects" self.zk_client.client.delete(future._waiter_path) @@ -992,7 +1004,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Find orphaned results client.cleanup(age=0) - self._assertEmptyRoots() + self._assertEmptyRoots(client) def test_nonexistent_lock(self): request_queue = queue.Queue() @@ -1005,16 +1017,15 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Scheduler submits request payload = {'merge': 'test'} - client.submit( + client.submit(MergeRequest( uuid='A', job_type=MergeRequest.MERGE, build_set_uuid='AA', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - ) - client_a = client.get(f"{client.MERGE_REQUEST_ROOT}/A") + ), payload) + client_a = client.get(f"{client.REQUEST_ROOT}/A") # Simulate the server side server = MergerApi(self.zk_client, @@ -1025,7 +1036,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Try to lock a request that was just removed self.assertFalse(server.lock(server_a)) - self._assertEmptyRoots() + self._assertEmptyRoots(client) def test_lost_merge_requests(self): # Test that lostMergeRequests() returns unlocked running merge @@ -1033,46 +1044,42 @@ class TestMergerApi(ZooKeeperBaseTestCase): merger_api = MergerApi(self.zk_client) payload = {'merge': 'test'} - merger_api.submit( + merger_api.submit(MergeRequest( uuid='A', job_type=MergeRequest.MERGE, build_set_uuid='AA', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - ) - merger_api.submit( + ), payload) + merger_api.submit(MergeRequest( uuid='B', job_type=MergeRequest.MERGE, build_set_uuid='BB', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - ) - merger_api.submit( + ), payload) + merger_api.submit(MergeRequest( uuid='C', job_type=MergeRequest.MERGE, build_set_uuid='CC', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - ) - merger_api.submit( + ), payload) + merger_api.submit(MergeRequest( uuid='D', job_type=MergeRequest.MERGE, build_set_uuid='DD', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - ) + ), payload) - b = merger_api.get(f"{merger_api.MERGE_REQUEST_ROOT}/B") - c = merger_api.get(f"{merger_api.MERGE_REQUEST_ROOT}/C") - d = merger_api.get(f"{merger_api.MERGE_REQUEST_ROOT}/D") + b = merger_api.get(f"{merger_api.REQUEST_ROOT}/B") + c = merger_api.get(f"{merger_api.REQUEST_ROOT}/C") + d = merger_api.get(f"{merger_api.REQUEST_ROOT}/D") b.state = MergeRequest.RUNNING merger_api.update(b) @@ -1090,7 +1097,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): # DataWatch might be triggered for the correct event, but the cache # might still be outdated as the DataWatch that updates the cache # itself wasn't triggered yet. - cache = merger_api._cached_merge_requests + cache = merger_api._cached_requests for _ in iterate_timeout(30, "cache to be up-to-date"): if (cache[b.path].state == MergeRequest.RUNNING and cache[c.path].state == MergeRequest.RUNNING): @@ -1098,7 +1105,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): # The lost_merges method should only return merges which are running # but not locked by any merger, in this case merge b - lost_merge_requests = list(merger_api.lostMergeRequests()) + lost_merge_requests = list(merger_api.lostRequests()) self.assertEqual(1, len(lost_merge_requests)) self.assertEqual(b.path, lost_merge_requests[0].path) @@ -1119,15 +1126,14 @@ class TestMergerApi(ZooKeeperBaseTestCase): # Simulate the client side client = MergerApi(self.zk_client) payload = {'merge': 'test'} - client.submit( + client.submit(MergeRequest( uuid='A', job_type=MergeRequest.MERGE, build_set_uuid='AA', tenant_name='tenant', pipeline_name='check', - params=payload, event_id='1', - ) + ), payload) # Simulate the server side server = MergerApi(self.zk_client, @@ -1143,7 +1149,7 @@ class TestMergerApi(ZooKeeperBaseTestCase): self.assertEqual(a.uuid, 'A') client.remove(a) - self._assertEmptyRoots() + self._assertEmptyRoots(client) class TestLocks(ZooKeeperBaseTestCase): diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 815baf9b0b..36e6967fd6 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -27,7 +27,7 @@ from zuul.model import ( ) from zuul.zk.event_queues import PipelineResultEventQueue from zuul.zk.executor import ExecutorApi -from zuul.zk.exceptions import BuildRequestNotFound +from zuul.zk.exceptions import JobRequestNotFound from kazoo.exceptions import BadVersionError @@ -131,15 +131,16 @@ class ExecutorClient(object): # Fall back to the default zone executor_zone = None - build.build_request_ref = self.executor_api.submit( + request = BuildRequest( uuid=uuid, tenant_name=build.build_set.item.pipeline.tenant.name, pipeline_name=build.build_set.item.pipeline.name, - params=params, zone=executor_zone, event_id=item.event.zuul_event_id, - precedence=PRIORITY_MAP[pipeline.precedence], + precedence=PRIORITY_MAP[pipeline.precedence] ) + self.executor_api.submit(request, params) + build.build_request_ref = request.path def cancel(self, build): log = get_annotated_logger(self.log, build.zuul_event_id, @@ -224,7 +225,7 @@ class ExecutorClient(object): del self.builds[build.uuid] def cleanupLostBuildRequests(self): - for build_request in self.executor_api.lostBuildRequests(): + for build_request in self.executor_api.lostRequests(): try: self.cleanupLostBuildRequest(build_request) except Exception: @@ -241,7 +242,7 @@ class ExecutorClient(object): build_request.state = BuildRequest.COMPLETED try: self.executor_api.update(build_request) - except BuildRequestNotFound as e: + except JobRequestNotFound as e: self.log.warning("Could not complete build: %s", str(e)) return except BadVersionError: diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 3177ecfcd1..84a50ad611 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -66,8 +66,9 @@ import zuul.model from zuul.nodepool import Nodepool from zuul.zk.event_queues import PipelineResultEventQueue from zuul.zk.components import ExecutorComponent -from zuul.zk.exceptions import BuildRequestNotFound -from zuul.zk.executor import BuildRequestEvent, ExecutorApi +from zuul.zk.exceptions import JobRequestNotFound +from zuul.zk.executor import ExecutorApi +from zuul.zk.job_request_queue import JobRequestEvent BUFFER_LINES_FOR_SYNTAX = 200 COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose', @@ -3511,13 +3512,13 @@ class ExecutorServer(BaseMergeServer): # not we could avoid this ZK update. The cancel request can anyway # only be fulfilled by the executor that executes the job. So, if # that executor died, no other can pick up the request. - if build_event == BuildRequestEvent.CANCELED: + if build_event == JobRequestEvent.CANCELED: self.executor_api.fulfillCancel(build_request) self.stopJob(build_request) - elif build_event == BuildRequestEvent.RESUMED: + elif build_event == JobRequestEvent.RESUMED: self.executor_api.fulfillResume(build_request) self.resumeJob(build_request) - elif build_event == BuildRequestEvent.DELETED: + elif build_event == JobRequestEvent.DELETED: self.stopJob(build_request) def runBuildWorker(self): @@ -3558,8 +3559,8 @@ class ExecutorServer(BaseMergeServer): return build_request.state = BuildRequest.RUNNING - params = self.executor_api.getBuildParams(build_request) - self.executor_api.clearBuildParams(build_request) + params = self.executor_api.getParams(build_request) + self.executor_api.clearParams(build_request) # Directly update the build in ZooKeeper, so we don't # loop over and try to lock it again and again. self.executor_api.update(build_request) @@ -3809,7 +3810,7 @@ class ExecutorServer(BaseMergeServer): build_request.state = BuildRequest.PAUSED try: self.executor_api.update(build_request) - except BuildRequestNotFound as e: + except JobRequestNotFound as e: self.log.warning("Could not pause build: %s", str(e)) return @@ -3821,7 +3822,7 @@ class ExecutorServer(BaseMergeServer): build_request.state = BuildRequest.RUNNING try: self.executor_api.update(build_request) - except BuildRequestNotFound as e: + except JobRequestNotFound as e: self.log.warning("Could not resume build: %s", str(e)) return @@ -3864,7 +3865,7 @@ class ExecutorServer(BaseMergeServer): build_request.state = BuildRequest.COMPLETED try: self.executor_api.update(build_request) - except BuildRequestNotFound as e: + except JobRequestNotFound as e: self.log.warning("Could not complete build: %s", str(e)) return diff --git a/zuul/merger/client.py b/zuul/merger/client.py index 2687f839dd..f69a9f2c34 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -19,7 +19,7 @@ from zuul.lib.config import get_default from zuul.lib.logutil import get_annotated_logger from zuul.model import MergeRequest, PRECEDENCE_HIGH, PRECEDENCE_NORMAL from zuul.zk.merger import MergerApi -from zuul.zk.exceptions import MergeRequestNotFound +from zuul.zk.exceptions import JobRequestNotFound from kazoo.exceptions import BadVersionError @@ -64,17 +64,17 @@ class MergeClient(object): log = get_annotated_logger(self.log, event) log.debug("Submitting job %s with data %s", uuid, data) - return self.merger_api.submit( + request = MergeRequest( uuid=uuid, job_type=job_type, build_set_uuid=build_set_uuid, tenant_name=tenant_name, pipeline_name=pipeline_name, - params=data, event_id=event.zuul_event_id if event else None, - precedence=precedence, - needs_result=needs_result, + precedence=precedence ) + return self.merger_api.submit(request, data, + needs_result=needs_result) def mergeChanges(self, items, build_set, files=None, dirs=None, repo_state=None, precedence=PRECEDENCE_NORMAL, @@ -128,7 +128,7 @@ class MergeClient(object): return job def cleanupLostMergeRequests(self): - for merge_request in self.merger_api.lostMergeRequests(): + for merge_request in self.merger_api.lostRequests(): try: self.cleanupLostMergeRequest(merge_request) except Exception: @@ -142,7 +142,7 @@ class MergeClient(object): # TODO (felix): If we want to optimize ZK requests, we could only # call the remove() here. self.merger_api.remove(merge_request) - except MergeRequestNotFound as e: + except JobRequestNotFound as e: self.log.warning("Could not complete merge: %s", str(e)) return except BadVersionError: diff --git a/zuul/merger/server.py b/zuul/merger/server.py index 22ce1a9e7b..a82b1ccb96 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -97,9 +97,6 @@ class BaseMergeServer(metaclass=ABCMeta): ) self.merger_loop_wake_event = threading.Event() - self.merger_cleanup_election = self.zk_client.client.Election( - f"{MergerApi.MERGE_REQUEST_ROOT}/election" - ) self.merger_api = MergerApi( self.zk_client, @@ -172,7 +169,6 @@ class BaseMergeServer(metaclass=ABCMeta): self.log.debug('Stopping merger') self._merger_running = False self.merger_loop_wake_event.set() - self.merger_cleanup_election.cancel() self.zk_client.disconnect() def join(self): @@ -215,8 +211,8 @@ class BaseMergeServer(metaclass=ABCMeta): return merge_request.state = MergeRequest.RUNNING - params = self.merger_api.getMergeParams(merge_request) - self.merger_api.clearMergeParams(merge_request) + params = self.merger_api.getParams(merge_request) + self.merger_api.clearParams(merge_request) # Directly update the merge request in ZooKeeper, so we don't loop over # and try to lock it again and again. self.merger_api.update(merge_request) diff --git a/zuul/model.py b/zuul/model.py index 5d45b2f8f3..b1047474f4 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -2069,130 +2069,31 @@ class JobGraph(object): @total_ordering -class MergeRequest: - +class JobRequest: # States: + UNSUBMITTED = "unsubmitted" REQUESTED = "requested" HOLD = "hold" # Used by tests to stall processing RUNNING = "running" COMPLETED = "completed" - ALL_STATES = (REQUESTED, HOLD, RUNNING, COMPLETED) + ALL_STATES = (UNSUBMITTED, REQUESTED, HOLD, RUNNING, COMPLETED) - # Types: - MERGE = "merge" - CAT = "cat" - REF_STATE = "refstate" - FILES_CHANGES = "fileschanges" - - def __init__( - self, - uuid, - state, - job_type, - precedence, - build_set_uuid, - tenant_name, - pipeline_name, - event_id - ): + def __init__(self, uuid, precedence=None, state=None, result_path=None): self.uuid = uuid - self.state = state - self.job_type = job_type - self.precedence = precedence - self.build_set_uuid = build_set_uuid - self.tenant_name = tenant_name - self.pipeline_name = pipeline_name - self.event_id = event_id - - # Path to the future result if requested - self.result_path = None - - # ZK related data - self.path = None - self._zstat = None - self.lock = None - - def toDict(self): - return { - "uuid": self.uuid, - "state": self.state, - "job_type": self.job_type, - "precedence": self.precedence, - "build_set_uuid": self.build_set_uuid, - "tenant_name": self.tenant_name, - "pipeline_name": self.pipeline_name, - "result_path": self.result_path, - "event_id": self.event_id, - } - - @classmethod - def fromDict(cls, data): - job = cls( - data["uuid"], - data["state"], - data["job_type"], - data["precedence"], - data["build_set_uuid"], - data["tenant_name"], - data["pipeline_name"], - data["event_id"], - ) - job.result_path = data.get("result_path") - return job - - def __lt__(self, other): - # Sort jobs by precedence and their creation time in ZooKeeper in - # ascending order to prevent older jobs from starving. - if self.precedence == other.precedence: - if self._zstat and other._zstat: - return self._zstat.ctime < other._zstat.ctime - return self.uuid < other.uuid - return self.precedence < other.precedence - - def __eq__(self, other): - same_prec = self.precedence == other.precedence - if self._zstat and other._zstat: - same_ctime = self._zstat.ctime == other._zstat.ctime + if precedence is None: + self.precedence = 0 else: - same_ctime = self.uuid == other.uuid + self.precedence = precedence - return same_prec and same_ctime + if state is None: + self.state = self.UNSUBMITTED + else: + self.state = state + # Path to the future result if requested + self.result_path = result_path - def __repr__(self): - return ( - f"" - ) - - -@total_ordering -class BuildRequest: - """A request for a build in a specific zone""" - - # States: - # Waiting - REQUESTED = 'requested' - HOLD = 'hold' # Used by tests to stall processing - # Running - RUNNING = 'running' - PAUSED = 'paused' - # Finished - COMPLETED = 'completed' - - ALL_STATES = (REQUESTED, HOLD, RUNNING, PAUSED, COMPLETED) - - def __init__(self, uuid, state, precedence, zone, - tenant_name, pipeline_name, event_id): - self.uuid = uuid - self.state = state - self.precedence = precedence - self.zone = zone - self.tenant_name = tenant_name - self.pipeline_name = pipeline_name - self.event_id = event_id - - # ZK related data + # ZK related data not serialized self.path = None self._zstat = None self.lock = None @@ -2202,34 +2103,27 @@ class BuildRequest: "uuid": self.uuid, "state": self.state, "precedence": self.precedence, - "zone": self.zone, - "tenant_name": self.tenant_name, - "pipeline_name": self.pipeline_name, - "event_id": self.event_id, + "result_path": self.result_path, } @classmethod def fromDict(cls, data): - build_request = cls( + return cls( data["uuid"], - data["state"], - data["precedence"], - data["zone"], - data["tenant_name"], - data["pipeline_name"], - data["event_id"], + precedence=data["precedence"], + state=data["state"], + result_path=data["result_path"] ) - return build_request - def __lt__(self, other): - # Sort build requests by precedence and their creation time in - # ZooKeeper in ascending order to prevent older builds from starving. + # Sort requests by precedence and their creation time in + # ZooKeeper in ascending order to prevent older requests from + # starving. if self.precedence == other.precedence: if self._zstat and other._zstat: return self._zstat.ctime < other._zstat.ctime # NOTE (felix): As the _zstat should always be set when retrieving - # the build request from ZooKeeper, this branch shouldn't matter + # the request from ZooKeeper, this branch shouldn't matter # much. It's just there, because the _zstat could - theoretically - # be None. return self.uuid < other.uuid @@ -2241,9 +2135,103 @@ class BuildRequest: same_ctime = self._zstat.ctime == other._zstat.ctime else: same_ctime = self.uuid == other.uuid - return same_prec and same_ctime + def __repr__(self): + return (f"") + + +class MergeRequest(JobRequest): + + # Types: + MERGE = "merge" + CAT = "cat" + REF_STATE = "refstate" + FILES_CHANGES = "fileschanges" + + def __init__(self, uuid, job_type, build_set_uuid, tenant_name, + pipeline_name, event_id, precedence=None, state=None, + result_path=None): + super().__init__(uuid, precedence, state, result_path) + self.job_type = job_type + self.build_set_uuid = build_set_uuid + self.tenant_name = tenant_name + self.pipeline_name = pipeline_name + self.event_id = event_id + + def toDict(self): + d = super().toDict() + d.update({ + "job_type": self.job_type, + "build_set_uuid": self.build_set_uuid, + "tenant_name": self.tenant_name, + "pipeline_name": self.pipeline_name, + "event_id": self.event_id, + }) + return d + + @classmethod + def fromDict(cls, data): + return cls( + data["uuid"], + data["job_type"], + data["build_set_uuid"], + data["tenant_name"], + data["pipeline_name"], + data["event_id"], + precedence=data["precedence"], + state=data["state"], + result_path=data["result_path"] + ) + + def __repr__(self): + return ( + f"" + ) + + +class BuildRequest(JobRequest): + """A request for a build in a specific zone""" + + # States: + PAUSED = 'paused' + + ALL_STATES = JobRequest.ALL_STATES + (PAUSED,) # type: ignore + + def __init__(self, uuid, zone, + tenant_name, pipeline_name, event_id, + precedence=None, state=None, result_path=None): + super().__init__(uuid, precedence, state, result_path) + self.zone = zone + self.tenant_name = tenant_name + self.pipeline_name = pipeline_name + self.event_id = event_id + + def toDict(self): + d = super().toDict() + d.update({ + "zone": self.zone, + "tenant_name": self.tenant_name, + "pipeline_name": self.pipeline_name, + "event_id": self.event_id, + }) + return d + + @classmethod + def fromDict(cls, data): + return cls( + data["uuid"], + data["zone"], + data["tenant_name"], + data["pipeline_name"], + data["event_id"], + precedence=data["precedence"], + state=data["state"], + result_path=data["result_path"] + ) + def __repr__(self): return ( f" build request - self._cached_build_requests = {} + self.zone_queues = DefaultKeyDict( + lambda zone: ExecutorQueue( + self.client, + self._getZoneRoot(zone), + self._getInitialState, + self.request_callback, + self.event_callback)) - self.kazoo_client.ensure_path(self.BUILD_PARAMS_ROOT) if zone_filter is None: self.registerAllZones() else: for zone in zone_filter: - self.registerZone(zone) + # For the side effect of creating a queue + self.zone_queues[zone] - @property - def initial_state(self): - # This supports holding build requests in tests + def _getInitialState(self): return BuildRequest.REQUESTED def _getZoneRoot(self, zone): if zone is None: - return "/".join([self.BUILD_REQUEST_ROOT, 'unzoned']) + return self.unzoned_root else: - return "/".join([self.BUILD_REQUEST_ROOT, 'zones', zone]) - - def registerZone(self, zone): - if zone in self._watched_zones: - return - zone_root = self._getZoneRoot(zone) - self.log.debug("Registering for zone %s at %s", zone, zone_root) - self.kazoo_client.ensure_path(zone_root) - - self.kazoo_client.ChildrenWatch( - zone_root, self._makeBuildRequestWatcher(zone_root), - send_event=True - ) - self._watched_zones.add(zone) + return f"{self.zones_root}/{zone}" def registerAllZones(self): - self.kazoo_client.ensure_path(self.BUILD_REQUEST_ROOT) - # Register a child watch that listens to new zones and automatically # registers to them. def watch_zones(children): for zone in children: - self.registerZone(zone) + # For the side effect of creating a queue + self.zone_queues[zone] - zones_root = "/".join([self.BUILD_REQUEST_ROOT, 'zones']) - self.kazoo_client.ensure_path(zones_root) - self.kazoo_client.ChildrenWatch(zones_root, watch_zones) - self.registerZone(None) - - def _makeBuildStateWatcher(self, path): - def watch(data, stat, event=None): - return self._watchBuildState(path, data, stat, event) - return watch - - def _watchBuildState(self, path, data, stat, event=None): - if not event or event.type == EventType.CHANGED: - # Don't process change events w/o any data. This can happen when - # a "slow" change watch tried to retrieve the data of a znode that - # was deleted in the meantime. - if data is None: - return - # As we already get the data and the stat value, we can directly - # use it without asking ZooKeeper for the data again. - content = self._bytesToDict(data) - if not content: - return - - # We need this one for the HOLD -> REQUESTED check further down - old_build_request = self._cached_build_requests.get(path) - - build_request = BuildRequest.fromDict(content) - build_request.path = path - build_request._zstat = stat - self._cached_build_requests[path] = build_request - - # NOTE (felix): This is a test-specific condition: For test cases - # which are using hold_jobs_in_queue the state change on the build - # request from HOLD to REQUESTED is done outside of the executor. - # Thus, we must also set the wake event (the callback) so the - # executor can pick up those builds after they are released. To not - # cause a thundering herd problem in production for each cache - # update, the callback is only called under this very specific - # condition that can only occur in the tests. - if ( - self.build_request_callback - and old_build_request - and old_build_request.state == BuildRequest.HOLD - and build_request.state == BuildRequest.REQUESTED - ): - self.build_request_callback() - - elif event.type == EventType.DELETED: - build_request = self._cached_build_requests.get(path) - with suppress(KeyError): - del self._cached_build_requests[path] - - if build_request and self.build_event_callback: - self.build_event_callback( - build_request, BuildRequestEvent.DELETED - ) - - # Return False to stop the datawatch as the build got deleted. - return False - - def _makeBuildRequestWatcher(self, path): - def watch(build_requests, event=None): - return self._watchBuildRequests(path, build_requests, event) - return watch - - def _watchBuildRequests(self, path, build_requests, event=None): - # The build_requests list always contains all active children. Thus, we - # first have to find the new ones by calculating the delta between the - # build_requests list and our current cache entries. - # NOTE (felix): We could also use this list to determine the deleted - # build requests, but it's easier to do this in the DataWatch for the - # single build request instead. Otherwise we have to deal with race - # conditions between the children and the data watch as one watch might - # update a cache entry while the other tries to remove it. - - build_request_paths = { - f"{path}/{uuid}" for uuid in build_requests - } - - new_build_requests = build_request_paths - set( - self._cached_build_requests.keys() - ) - - for req_path in new_build_requests: - ExistingDataWatch(self.kazoo_client, - req_path, - self._makeBuildStateWatcher(req_path)) - - # Notify the user about new build requests if a callback is provided, - # but only if there are new requests (we don't want to fire on the - # initial callback from kazoo from registering the datawatch). - if new_build_requests and self.build_request_callback: - self.build_request_callback() - - def _iterBuildRequests(self): - # As the entries in the cache dictionary are added and removed via - # data and children watches, we can't simply iterate over it in here, - # as the values might change during iteration. - for key in list(self._cached_build_requests.keys()): - try: - build_request = self._cached_build_requests[key] - except KeyError: - continue - yield build_request - - def inState(self, *states): - if not states: - # If no states are provided, build a tuple containing all available - # ones to always match. We need a tuple to be compliant to the - # type of *states above. - states = BuildRequest.ALL_STATES - - build_requests = list( - filter(lambda b: b.state in states, self._iterBuildRequests()) - ) - - # Sort the list of builds by precedence and their creation time in - # ZooKeeper in ascending order to prevent older builds from starving. - return (b for b in sorted(build_requests)) - - def next(self): - yield from self.inState(BuildRequest.REQUESTED) - - def submit(self, uuid, tenant_name, pipeline_name, params, zone, - event_id, precedence=200): - log = get_annotated_logger(self.log, event=None, build=uuid) - - zone_root = self._getZoneRoot(zone) - path = "/".join([zone_root, uuid]) - - build_request = BuildRequest( - uuid, - self.initial_state, - precedence, - zone, - tenant_name, - pipeline_name, - event_id, - ) - - log.debug("Submitting build request to ZooKeeper %s", build_request) - - self.kazoo_client.ensure_path(zone_root) - - params_path = self._getParamsPath(uuid) - with sharding.BufferedShardWriter( - self.kazoo_client, params_path) as stream: - stream.write(self._dictToBytes(params)) - - return self.kazoo_client.create( - path, self._dictToBytes(build_request.toDict())) - - # We use child nodes here so that we don't need to lock the build - # request node. - def requestResume(self, build_request): - self.kazoo_client.ensure_path(f"{build_request.path}/resume") - - def requestCancel(self, build_request): - self.kazoo_client.ensure_path(f"{build_request.path}/cancel") - - def fulfillResume(self, build_request): - self.kazoo_client.delete(f"{build_request.path}/resume") - - def fulfillCancel(self, build_request): - self.kazoo_client.delete(f"{build_request.path}/cancel") - - def update(self, build_request): - log = get_annotated_logger( - self.log, event=None, build=build_request.uuid - ) - log.debug("Updating build request %s", build_request) - - if build_request._zstat is None: - log.debug( - "Cannot update build request %s: Missing version information.", - build_request.uuid, - ) - return - try: - zstat = self.kazoo_client.set( - build_request.path, - self._dictToBytes(build_request.toDict()), - version=build_request._zstat.version, - ) - # Update the zstat on the item after updating the ZK node - build_request._zstat = zstat - except NoNodeError: - raise BuildRequestNotFound( - f"Could not update {build_request.path}" - ) - - def get(self, path): - """Get a build request - - Note: do not mix get with iteration; iteration returns cached - BuildRequests while get returns a newly created object each - time. If you lock a BuildRequest, you must use the same - object to unlock it. - - """ - - try: - data, zstat = self.kazoo_client.get(path) - except NoNodeError: - return None - - if not data: - return None - - content = self._bytesToDict(data) - - build_request = BuildRequest.fromDict(content) - build_request.path = path - build_request._zstat = zstat - - return build_request - - def remove(self, build_request): - log = get_annotated_logger( - self.log, event=None, build=build_request.uuid - ) - log.debug("Removing build request %s", build_request) - try: - # As the build node might contain children (result, data, ...) we - # must delete it recursively. - self.kazoo_client.delete(build_request.path, recursive=True) - except NoNodeError: - # Nothing to do if the node is already deleted - pass - self.clearBuildParams(build_request) - try: - # Delete the lock parent node as well. - path = "/".join([self.LOCK_ROOT, build_request.uuid]) - self.kazoo_client.delete(path, recursive=True) - except NoNodeError: - pass - - def _watchBuildEvents(self, actions, event=None): - if event is None: - return - - build_event = None - if "cancel" in actions: - build_event = BuildRequestEvent.CANCELED - elif "resume" in actions: - build_event = BuildRequestEvent.RESUMED - - if build_event and self.build_event_callback: - build_request = self._cached_build_requests.get(event.path) - self.build_event_callback(build_request, build_event) - - def lock(self, build_request, blocking=True, timeout=None): - # Keep the lock nodes in a different path to keep the build request - # subnode structure clean. Otherwise, the lock node will be in between - # the cancel and resume requests. - path = "/".join([self.LOCK_ROOT, build_request.uuid]) - have_lock = False - lock = None - try: - lock = Lock(self.kazoo_client, path) - have_lock = lock.acquire(blocking, timeout) - except LockTimeout: - have_lock = False - self.log.error( - "Timeout trying to acquire lock: %s", build_request.uuid - ) - - # If we aren't blocking, it's possible we didn't get the lock - # because someone else has it. - if not have_lock: - return False - - if not self.kazoo_client.exists(build_request.path): - lock.release() - self.log.error( - "Build not found for locking: %s", build_request.uuid - ) - - # We may have just re-created the lock parent node just - # after the scheduler deleted it; therefore we should - # (re-) delete it. - try: - # Delete the lock parent node as well. - path = "/".join([self.LOCK_ROOT, build_request.uuid]) - self.kazoo_client.delete(path, recursive=True) - except NoNodeError: - pass - - return False - - build_request.lock = lock - - # Create the children watch to listen for cancel/resume actions on this - # build request. - self.kazoo_client.ChildrenWatch( - build_request.path, self._watchBuildEvents, send_event=True - ) - return True - - def unlock(self, build_request): - if build_request.lock is None: - self.log.warning( - "BuildRequest %s does not hold a lock", build_request - ) - else: - build_request.lock.release() - build_request.lock = None - - def isLocked(self, build_request): - path = "/".join([self.LOCK_ROOT, build_request.uuid]) - lock = Lock(self.kazoo_client, path) - is_locked = len(lock.contenders()) > 0 - return is_locked - - def lostBuildRequests(self): - # Get a list of builds which are running but not locked by any executor - yield from filter( - lambda b: not self.isLocked(b), - self.inState(BuildRequest.RUNNING, BuildRequest.PAUSED), - ) + self.client.client.ChildrenWatch(self.zones_root, watch_zones) + # For the side effect of creating a queue + self.zone_queues[None] def _getAllZones(self): # Get a list of all zones without using the cache. try: # Get all available zones from ZooKeeper - zones = self.kazoo_client.get_children( - '/'.join([self.BUILD_REQUEST_ROOT, 'zones'])) + zones = self.client.client.get_children(self.zones_root) zones.append(None) except NoNodeError: zones = [None] return zones - def _getAllBuildIds(self, zones=None): - # Get a list of all build uuids without using the cache. - if zones is None: - zones = self._getAllZones() + # Override JobRequestQueue methods to accomodate the zone dict. - all_builds = set() - for zone in zones: - try: - zone_path = self._getZoneRoot(zone) - all_builds.update(self.kazoo_client.get_children(zone_path)) - except NoNodeError: - # Skip this zone as it doesn't have any builds - continue - return all_builds + def inState(self, *states): + requests = [] + for queue in self.zone_queues.values(): + requests.extend(queue.inState(*states)) + return sorted(requests) - def _findLostParams(self, age): - # Get data nodes which are older than the specified age (we - # don't want to delete nodes which are just being written - # slowly). - # Convert to MS - now = int(time.time() * 1000) - age = age * 1000 - data_nodes = dict() - for data_id in self.kazoo_client.get_children(self.BUILD_PARAMS_ROOT): - data_path = self._getParamsPath(data_id) - data_zstat = self.kazoo_client.exists(data_path) - if now - data_zstat.mtime > age: - data_nodes[data_id] = data_path + def next(self): + yield from self.inState(BuildRequest.REQUESTED) - # If there are no candidate data nodes, we don't need to - # filter them by known requests. - if not data_nodes: - return data_nodes.values() + def submit(self, request, params): + return self.zone_queues[request.zone].submit(request, params) - # Remove current request uuids - for request_id in self._getAllBuildIds(): - if request_id in data_nodes: - del data_nodes[request_id] + def update(self, request): + return self.zone_queues[request.zone].update(request) - # Return the paths - return data_nodes.values() + def reportResult(self, request, result): + return self.zone_queues[request.zone].reportResult(request) + + def get(self, path): + if path.startswith(self.zones_root): + zone = path[len(self.zones_root):] + else: + zone = None + return self.zone_queues[zone].get(path) + + def remove(self, request): + return self.zone_queues[request.zone].remove(request) + + def requestResume(self, request): + return self.zone_queues[request.zone].requestResume(request) + + def requestCancel(self, request): + return self.zone_queues[request.zone].requestCancel(request) + + def fulfillResume(self, request): + return self.zone_queues[request.zone].fulfillResume(request) + + def fulfillCancel(self, request): + return self.zone_queues[request.zone].fulfillCancel(request) + + def lock(self, request, *args, **kw): + return self.zone_queues[request.zone].lock(request, *args, **kw) + + def unlock(self, request): + return self.zone_queues[request.zone].unlock(request) + + def isLocked(self, request): + return self.zone_queues[request.zone].isLocked(request) + + def lostRequests(self): + for queue in self.zone_queues.values(): + yield from queue.lostRequests() def cleanup(self, age=300): - # Delete build request params which are not associated with - # any current build requests. Note, this does not clean up - # lost build requests themselves; the executor client takes - # care of that. - try: - for path in self._findLostParams(age): - try: - self.log.error("Removing build request params: %s", path) - self.kazoo_client.delete(path, recursive=True) - except Exception: - self.log.execption( - "Unable to delete build request params %s", path) - except Exception: - self.log.exception( - "Error cleaning up build request queue %s", self) + for queue in self.zone_queues.values(): + queue.cleanup(age) - @staticmethod - def _bytesToDict(data): - return json.loads(data.decode("utf-8")) + def clearParams(self, request): + return self.zone_queues[request.zone].clearParams(request) - @staticmethod - def _dictToBytes(data): - # The custom json_dumps() will also serialize MappingProxyType objects - return json_dumps(data).encode("utf-8") + def getParams(self, request): + return self.zone_queues[request.zone].getParams(request) - def _getParamsPath(self, build_uuid): - return '/'.join([self.BUILD_PARAMS_ROOT, build_uuid]) - - def clearBuildParams(self, build_request): - """Erase the build parameters from ZK to save space""" - self.kazoo_client.delete(self._getParamsPath(build_request.uuid), - recursive=True) - - def getBuildParams(self, build_request): - """Return the parameters for a build request, if they exist. - - Once a build request is accepted by an executor, the params - may be erased from ZK; this will return None in that case. - - """ - with sharding.BufferedShardReader( - self.kazoo_client, - self._getParamsPath(build_request.uuid)) as stream: - data = stream.read() - if not data: - return None - return self._bytesToDict(data) + def _getAllRequestIds(self): + ret = [] + for queue in self.zone_queues.values(): + ret.extend(queue._getAllRequestIds()) + return ret diff --git a/zuul/zk/job_request_queue.py b/zuul/zk/job_request_queue.py new file mode 100644 index 0000000000..c044eeaf87 --- /dev/null +++ b/zuul/zk/job_request_queue.py @@ -0,0 +1,528 @@ +# Copyright 2021 BMW Group +# Copyright 2021 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import logging +import time +from contextlib import suppress +from enum import Enum + +from kazoo.exceptions import LockTimeout, NoNodeError +from kazoo.protocol.states import EventType +from kazoo.recipe.lock import Lock + +from zuul.lib.jsonutil import json_dumps +from zuul.lib.logutil import get_annotated_logger +from zuul.model import JobRequest +from zuul.zk import ZooKeeperSimpleBase, sharding +from zuul.zk.event_queues import JobResultFuture +from zuul.zk.exceptions import JobRequestNotFound +from zuul.zk.vendor.watchers import ExistingDataWatch + + +class JobRequestEvent(Enum): + CREATED = 0 + UPDATED = 1 + RESUMED = 2 + CANCELED = 3 + DELETED = 4 + + +class JobRequestQueue(ZooKeeperSimpleBase): + log = logging.getLogger("zuul.JobRequestQueue") + request_class = JobRequest + + def __init__(self, client, root, + request_callback=None, event_callback=None): + super().__init__(client) + + self.REQUEST_ROOT = f"{root}/requests" + self.LOCK_ROOT = f"{root}/locks" + self.PARAM_ROOT = f"{root}/params" + self.RESULT_ROOT = f"{root}/results" + self.RESULT_DATA_ROOT = f"{root}/result-data" + self.WAITER_ROOT = f"{root}/waiters" + + self.request_callback = request_callback + self.event_callback = event_callback + + # path -> request + self._cached_requests = {} + + self.kazoo_client.ensure_path(self.REQUEST_ROOT) + self.kazoo_client.ensure_path(self.PARAM_ROOT) + self.kazoo_client.ensure_path(self.RESULT_ROOT) + self.kazoo_client.ensure_path(self.RESULT_DATA_ROOT) + self.kazoo_client.ensure_path(self.WAITER_ROOT) + self.kazoo_client.ensure_path(self.LOCK_ROOT) + + self.register() + + @property + def initial_state(self): + # This supports holding requests in tests + return self.request_class.REQUESTED + + def register(self): + # Register a child watch that listens for new requests + self.kazoo_client.ChildrenWatch( + self.REQUEST_ROOT, + self._makeRequestWatcher(self.REQUEST_ROOT), + send_event=True, + ) + + def _makeRequestWatcher(self, path): + def watch(requests, event=None): + return self._watchRequests(path, requests) + return watch + + def _watchRequests(self, path, requests): + # The requests list always contains all active children. Thus, + # we first have to find the new ones by calculating the delta + # between the requests list and our current cache entries. + # NOTE (felix): We could also use this list to determine the + # deleted requests, but it's easier to do this in the + # DataWatch for the single request instead. Otherwise we have + # to deal with race conditions between the children and the + # data watch as one watch might update a cache entry while the + # other tries to remove it. + + request_paths = { + f"{path}/{uuid}" for uuid in requests + } + + new_requests = request_paths - set( + self._cached_requests.keys() + ) + + for req_path in new_requests: + ExistingDataWatch(self.kazoo_client, + req_path, + self._makeStateWatcher(req_path)) + + # Notify the user about new requests if a callback is provided. + # When we register the data watch, we will receive an initial + # callback immediately. The list of children may be empty in + # that case, so we should not fire our callback since there + # are no requests to handle. + + if new_requests and self.request_callback: + self.request_callback() + + def _makeStateWatcher(self, path): + def watch(data, stat, event=None): + return self._watchState(path, data, stat, event) + return watch + + def _watchState(self, path, data, stat, event=None): + if not event or event.type == EventType.CHANGED: + # Don't process change events w/o any data. This can happen when a + # "slow" change watch tried to retrieve the data of a znode that + # was deleted in the meantime. + if not data: + return + # As we already get the data and the stat value, we can directly + # use it without asking ZooKeeper for the data again. + content = self._bytesToDict(data) + if not content: + return + + # We need this one for the HOLD -> REQUESTED check further down + old_request = self._cached_requests.get(path) + + request = self.request_class.fromDict(content) + request.path = path + request._zstat = stat + self._cached_requests[path] = request + + # NOTE (felix): This is a test-specific condition: For test cases + # which are using hold_*_jobs_in_queue the state change on the + # request from HOLD to REQUESTED is done outside of the server. + # Thus, we must also set the wake event (the callback) so the + # servercan pick up those jobs after they are released. To not + # cause a thundering herd problem in production for each cache + # update, the callback is only called under this very specific + # condition that can only occur in the tests. + if ( + self.request_callback + and old_request + and old_request.state == self.request_class.HOLD + and request.state == self.request_class.REQUESTED + ): + self.request_callback() + + elif event.type == EventType.DELETED: + request = self._cached_requests.get(path) + with suppress(KeyError): + del self._cached_requests[path] + + if request and self.event_callback: + self.event_callback(request, JobRequestEvent.DELETED) + + # Return False to stop the datawatch as the build got deleted. + return False + + def inState(self, *states): + if not states: + # If no states are provided, build a tuple containing all available + # ones to always match. We need a tuple to be compliant to the + # type of *states above. + states = self.request_class.ALL_STATES + + requests = [ + req for req in self._cached_requests.values() + if req.state in states + ] + + # Sort the list of requests by precedence and their creation time + # in ZooKeeper in ascending order to prevent older requests from + # starving. + return sorted(requests) + + def next(self): + yield from self.inState(self.request_class.REQUESTED) + + def submit(self, request, params, needs_result=False): + log = get_annotated_logger(self.log, event=request.event_id) + + path = "/".join([self.REQUEST_ROOT, request.uuid]) + request.path = path + + assert isinstance(request, self.request_class) + assert request.state == self.request_class.UNSUBMITTED + request.state = self.initial_state + + result = None + + # If a result is needed, create the result_path with the same + # UUID and store it on the request, so the server can store + # the result there. + if needs_result: + result_path = "/".join( + [self.RESULT_ROOT, request.uuid] + ) + waiter_path = "/".join( + [self.WAITER_ROOT, request.uuid] + ) + self.kazoo_client.create(waiter_path, ephemeral=True) + result = JobResultFuture(self.client, result_path, waiter_path) + request.result_path = result_path + + log.debug("Submitting job request to ZooKeeper %s", request) + + params_path = self._getParamsPath(request.uuid) + with sharding.BufferedShardWriter( + self.kazoo_client, params_path + ) as stream: + stream.write(self._dictToBytes(params)) + + self.kazoo_client.create(path, self._dictToBytes(request.toDict())) + + return result + + def update(self, request): + log = get_annotated_logger( + self.log, event=request.event_id, build=request.uuid + ) + log.debug("Updating request %s", request) + + if request._zstat is None: + log.debug( + "Cannot update request %s: Missing version information.", + request.uuid, + ) + return + try: + zstat = self.kazoo_client.set( + request.path, + self._dictToBytes(request.toDict()), + version=request._zstat.version, + ) + # Update the zstat on the item after updating the ZK node + request._zstat = zstat + except NoNodeError: + raise JobRequestNotFound( + f"Could not update {request.path}" + ) + + def reportResult(self, request, result): + # Write the result data first since it may be multiple nodes. + result_data_path = "/".join( + [self.RESULT_DATA_ROOT, request.uuid] + ) + with sharding.BufferedShardWriter( + self.kazoo_client, result_data_path) as stream: + stream.write(self._dictToBytes(result)) + + # Then write the result node to signify it's ready. + data = {'result_data_path': result_data_path} + self.kazoo_client.create(request.result_path, + self._dictToBytes(data)) + + def get(self, path): + """Get a request + + Note: do not mix get with iteration; iteration returns cached + requests while get returns a newly created object each + time. If you lock a request, you must use the same object to + unlock it. + + """ + try: + data, zstat = self.kazoo_client.get(path) + except NoNodeError: + return None + + if not data: + return None + + content = self._bytesToDict(data) + + request = self.request_class.fromDict(content) + request.path = path + request._zstat = zstat + + return request + + def remove(self, request): + self.log.debug("Removing request %s", request) + try: + self.kazoo_client.delete(request.path, recursive=True) + except NoNodeError: + # Nothing to do if the node is already deleted + pass + self.clearParams(request) + try: + # Delete the lock parent node as well + path = "/".join([self.LOCK_ROOT, request.uuid]) + self.kazoo_client.delete(path, recursive=True) + except NoNodeError: + pass + + # We use child nodes here so that we don't need to lock the + # request node. + def requestResume(self, request): + self.kazoo_client.ensure_path(f"{request.path}/resume") + + def requestCancel(self, request): + self.kazoo_client.ensure_path(f"{request.path}/cancel") + + def fulfillResume(self, request): + self.kazoo_client.delete(f"{request.path}/resume") + + def fulfillCancel(self, request): + self.kazoo_client.delete(f"{request.path}/cancel") + + def _watchEvents(self, actions, event=None): + if event is None: + return + + job_event = None + if "cancel" in actions: + job_event = JobRequestEvent.CANCELED + elif "resume" in actions: + job_event = JobRequestEvent.RESUMED + + if job_event: + request = self._cached_requests.get(event.path) + self.event_callback(request, job_event) + + def lock(self, request, blocking=True, timeout=None): + path = "/".join([self.LOCK_ROOT, request.uuid]) + have_lock = False + lock = None + try: + lock = Lock(self.kazoo_client, path) + have_lock = lock.acquire(blocking, timeout) + except LockTimeout: + have_lock = False + self.log.error( + "Timeout trying to acquire lock: %s", request.uuid + ) + + # If we aren't blocking, it's possible we didn't get the lock + # because someone else has it. + if not have_lock: + return False + + if not self.kazoo_client.exists(request.path): + lock.release() + self.log.error( + "Request not found for locking: %s", request.uuid + ) + + # We may have just re-created the lock parent node just after the + # scheduler deleted it; therefore we should (re-) delete it. + try: + # Delete the lock parent node as well. + path = "/".join([self.LOCK_ROOT, request.uuid]) + self.kazoo_client.delete(path, recursive=True) + except NoNodeError: + pass + + return False + + request.lock = lock + + # Create the children watch to listen for cancel/resume actions on this + # build request. + if self.event_callback: + self.kazoo_client.ChildrenWatch( + request.path, self._watchEvents, send_event=True) + + return True + + def unlock(self, request): + if request.lock is None: + self.log.warning( + "Request %s does not hold a lock", request + ) + else: + request.lock.release() + request.lock = None + + def isLocked(self, request): + path = "/".join([self.LOCK_ROOT, request.uuid]) + lock = Lock(self.kazoo_client, path) + is_locked = len(lock.contenders()) > 0 + return is_locked + + def lostRequests(self): + # Get a list of requests which are running but not locked by + # any client. + yield from filter( + lambda b: not self.isLocked(b), + self.inState(self.request_class.RUNNING), + ) + + def _getAllRequestIds(self): + # Get a list of all request ids without using the cache. + return self.kazoo_client.get_children(self.REQUEST_ROOT) + + def _findLostParams(self, age): + # Get data nodes which are older than the specified age (we + # don't want to delete nodes which are just being written + # slowly). + # Convert to MS + now = int(time.time() * 1000) + age = age * 1000 + data_nodes = dict() + for data_id in self.kazoo_client.get_children(self.PARAM_ROOT): + data_path = self._getParamsPath(data_id) + data_zstat = self.kazoo_client.exists(data_path) + if now - data_zstat.mtime > age: + data_nodes[data_id] = data_path + + # If there are no candidate data nodes, we don't need to + # filter them by known requests. + if not data_nodes: + return data_nodes.values() + + # Remove current request uuids + for request_id in self._getAllRequestIds(): + if request_id in data_nodes: + del data_nodes[request_id] + + # Return the paths + return data_nodes.values() + + def _findLostResults(self): + # Get a list of results which don't have a connection waiting for + # them. As the results and waiters are not part of our cache, we have + # to look them up directly from ZK. + waiters1 = set(self.kazoo_client.get_children(self.WAITER_ROOT)) + results = set(self.kazoo_client.get_children(self.RESULT_ROOT)) + result_data = set(self.kazoo_client.get_children( + self.RESULT_DATA_ROOT)) + waiters2 = set(self.kazoo_client.get_children(self.WAITER_ROOT)) + + waiters = waiters1.union(waiters2) + lost_results = results - waiters + lost_data = result_data - waiters + return lost_results, lost_data + + def cleanup(self, age=300): + # Delete build request params which are not associated with + # any current build requests. Note, this does not clean up + # lost requests themselves; the client takes care of that. + try: + for path in self._findLostParams(age): + try: + self.log.error("Removing request params: %s", path) + self.kazoo_client.delete(path, recursive=True) + except Exception: + self.log.execption( + "Unable to delete request params %s", path) + except Exception: + self.log.exception( + "Error cleaning up request queue %s", self) + try: + lost_results, lost_data = self._findLostResults() + for result_id in lost_results: + try: + path = '/'.join([self.RESULT_ROOT, result_id]) + self.log.error("Removing request result: %s", path) + self.kazoo_client.delete(path, recursive=True) + except Exception: + self.log.execption( + "Unable to delete request params %s", result_id) + for result_id in lost_data: + try: + path = '/'.join([self.RESULT_DATA_ROOT, result_id]) + self.log.error( + "Removing request result data: %s", path) + self.kazoo_client.delete(path, recursive=True) + except Exception: + self.log.execption( + "Unable to delete request params %s", result_id) + except Exception: + self.log.exception( + "Error cleaning up result queue %s", self) + + @staticmethod + def _bytesToDict(data): + return json.loads(data.decode("utf-8")) + + @staticmethod + def _dictToBytes(data): + # The custom json_dumps() will also serialize MappingProxyType objects + return json_dumps(data).encode("utf-8") + + def _getParamsPath(self, uuid): + return '/'.join([self.PARAM_ROOT, uuid]) + + def clearParams(self, request): + """Erase the parameters from ZK to save space""" + self.kazoo_client.delete(self._getParamsPath(request.uuid), + recursive=True) + + def getParams(self, request): + """Return the parameters for a request, if they exist. + + Once a request is accepted by an executor, the params + may be erased from ZK; this will return None in that case. + + """ + with sharding.BufferedShardReader( + self.kazoo_client, self._getParamsPath(request.uuid) + ) as stream: + data = stream.read() + if not data: + return None + return self._bytesToDict(data) + + def deleteResult(self, path): + with suppress(NoNodeError): + self.kazoo_client.delete(path, recursive=True) diff --git a/zuul/zk/merger.py b/zuul/zk/merger.py index f162d23ee7..5ae86b3ab2 100644 --- a/zuul/zk/merger.py +++ b/zuul/zk/merger.py @@ -12,483 +12,16 @@ # License for the specific language governing permissions and limitations # under the License. -import json import logging -import time -from contextlib import suppress -from kazoo.exceptions import LockTimeout, NoNodeError -from kazoo.protocol.states import EventType -from kazoo.recipe.lock import Lock - -from zuul.lib.jsonutil import json_dumps -from zuul.lib.logutil import get_annotated_logger from zuul.model import MergeRequest -from zuul.zk import ZooKeeperSimpleBase, sharding -from zuul.zk.event_queues import MergerEventResultFuture -from zuul.zk.exceptions import MergeRequestNotFound -from zuul.zk.vendor.watchers import ExistingDataWatch +from zuul.zk.job_request_queue import JobRequestQueue -class MergerApi(ZooKeeperSimpleBase): - - MERGE_REQUEST_ROOT = "/zuul/merge-requests" - MERGE_PARAMS_ROOT = "/zuul/merge-params" - MERGE_RESULT_ROOT = "/zuul/merge-results" - MERGE_RESULT_DATA_ROOT = "/zuul/merge-result-data" - MERGE_WAITER_ROOT = "/zuul/merge-waiters" - LOCK_ROOT = "/zuul/merge-request-locks" - - log = logging.getLogger("zuul.zk.merger.MergerApi") +class MergerApi(JobRequestQueue): + log = logging.getLogger("zuul.MergerApi") + request_class = MergeRequest # type: ignore def __init__(self, client, merge_request_callback=None): - super().__init__(client) - - self.merge_request_callback = merge_request_callback - - # path -> merge request - self._cached_merge_requests = {} - - self.register() - - @property - def initial_state(self): - # This supports holding merge requests in tests - return MergeRequest.REQUESTED - - def register(self): - self.kazoo_client.ensure_path(self.MERGE_REQUEST_ROOT) - self.kazoo_client.ensure_path(self.MERGE_PARAMS_ROOT) - self.kazoo_client.ensure_path(self.MERGE_RESULT_ROOT) - self.kazoo_client.ensure_path(self.MERGE_RESULT_DATA_ROOT) - self.kazoo_client.ensure_path(self.MERGE_WAITER_ROOT) - self.kazoo_client.ensure_path(self.LOCK_ROOT) - - # Register a child watch that listens for new merge requests - self.kazoo_client.ChildrenWatch( - self.MERGE_REQUEST_ROOT, - self._makeMergeRequestWatcher(self.MERGE_REQUEST_ROOT), - send_event=True, - ) - - def _makeMergeStateWatcher(self, path): - def watch(data, stat, event=None): - return self._watchMergeState(path, data, stat, event) - return watch - - def _watchMergeState(self, path, data, stat, event=None): - if not event or event.type == EventType.CHANGED: - # Don't process change events w/o any data. This can happen when a - # "slow" change watch tried to retrieve the data of a znode that - # was deleted in the meantime. - if not data: - return - # As we already get the data and the stat value, we can directly - # use it without asking ZooKeeper for the data again. - content = self._bytesToDict(data) - if not content: - return - - # We need this one for the HOLD -> REQUESTED check further down - old_merge_request = self._cached_merge_requests.get(path) - - merge_request = MergeRequest.fromDict(content) - merge_request.path = path - merge_request._zstat = stat - self._cached_merge_requests[path] = merge_request - - # NOTE (felix): This is a test-specific condition: For test cases - # which are using hold_merge_jobs_in_queue the state change on the - # merge request from HOLD to REQUESTED is done outside of the - # merger. - # Thus, we must also set the wake event (the callback) so the - # merger can pick up those jobs after they are released. To not - # cause a thundering herd problem in production for each cache - # update, the callback is only called under this very specific - # condition that can only occur in the tests. - if ( - self.merge_request_callback - and old_merge_request - and old_merge_request.state == MergeRequest.HOLD - and merge_request.state == MergeRequest.REQUESTED - ): - self.merge_request_callback() - - elif event.type == EventType.DELETED: - with suppress(KeyError): - del self._cached_merge_requests[path] - - # Return False to stop the datawatch as the build got deleted. - return False - - def _makeMergeRequestWatcher(self, path): - def watch(merge_requests, event=None): - return self._watchMergeRequests(path, merge_requests) - return watch - - def _watchMergeRequests(self, path, merge_requests): - # The merge_requests list always contains all active children. Thus, we - # first have to find the new ones by calculating the delta between the - # merge_requests list and our current cache entries. - # NOTE (felix): We could also use this list to determine the deleted - # merge requests, but it's easier to do this in the DataWatch for the - # single merge request instead. Otherwise we have to deal with race - # conditions between the children and the data watch as one watch might - # update a cache entry while the other tries to remove it. - - merge_request_paths = { - f"{path}/{uuid}" for uuid in merge_requests - } - - new_merge_requests = merge_request_paths - set( - self._cached_merge_requests.keys() - ) - - for req_path in new_merge_requests: - ExistingDataWatch(self.kazoo_client, - req_path, - self._makeMergeStateWatcher(req_path)) - - # Notify the user about new merge requests if a callback is provided. - # When we register the data watch, we will receive an initial - # callback immediately. The list of children may be empty in - # that case, so we should not fire our callback since there - # are no merge requests to handle. - - if new_merge_requests and self.merge_request_callback: - self.merge_request_callback() - - def _iterMergeRequests(self): - # As the entries in the cache dictionary are added and removed via - # data and children watches, we can't simply iterate over it in here, - # as the values might change during iteration. - for key in list(self._cached_merge_requests.keys()): - try: - merge_request = self._cached_merge_requests[key] - except KeyError: - continue - yield merge_request - - def inState(self, *states): - if not states: - # If no states are provided, build a tuple containing all available - # ones to always match. We need a tuple to be compliant to the - # type of *states above. - states = MergeRequest.ALL_STATES - - merge_requests = list( - filter(lambda b: b.state in states, self._iterMergeRequests()) - ) - - # Sort the list of merge requests by precedence and their creation time - # in ZooKeeper in ascending order to prevent older requests from - # starving. - return (b for b in sorted(merge_requests)) - - def next(self): - yield from self.inState(MergeRequest.REQUESTED) - - def submit(self, uuid, job_type, build_set_uuid, tenant_name, - pipeline_name, params, event_id, precedence=0, - needs_result=False): - log = get_annotated_logger(self.log, event=event_id) - - path = "/".join([self.MERGE_REQUEST_ROOT, uuid]) - - merge_request = MergeRequest( - uuid, - self.initial_state, - job_type, - precedence, - build_set_uuid, - tenant_name, - pipeline_name, - event_id, - ) - - result = None - - # If a result is needed, create the result_path with the same UUID and - # store it on the merge request, so the merger server can store the - # result there. - if needs_result: - result_path = "/".join( - [self.MERGE_RESULT_ROOT, merge_request.uuid] - ) - waiter_path = "/".join( - [self.MERGE_WAITER_ROOT, merge_request.uuid] - ) - self.kazoo_client.create(waiter_path, ephemeral=True) - result = MergerEventResultFuture(self.client, result_path, - waiter_path) - merge_request.result_path = result_path - - log.debug("Submitting merge request to ZooKeeper %s", merge_request) - - params_path = self._getParamsPath(uuid) - with sharding.BufferedShardWriter( - self.kazoo_client, params_path - ) as stream: - stream.write(self._dictToBytes(params)) - - self.kazoo_client.create( - path, self._dictToBytes(merge_request.toDict())) - - return result - - def update(self, merge_request): - log = get_annotated_logger( - self.log, event=None, build=merge_request.uuid - ) - log.debug("Updating merge request %s", merge_request) - - if merge_request._zstat is None: - log.debug( - "Cannot update merge request %s: Missing version information.", - merge_request.uuid, - ) - return - try: - zstat = self.kazoo_client.set( - merge_request.path, - self._dictToBytes(merge_request.toDict()), - version=merge_request._zstat.version, - ) - # Update the zstat on the item after updating the ZK node - merge_request._zstat = zstat - except NoNodeError: - raise MergeRequestNotFound( - f"Could not update {merge_request.path}" - ) - - def reportResult(self, merge_request, result): - # Write the result data first since it may be multiple nodes. - result_data_path = "/".join( - [self.MERGE_RESULT_DATA_ROOT, merge_request.uuid] - ) - with sharding.BufferedShardWriter( - self.kazoo_client, result_data_path) as stream: - stream.write(self._dictToBytes(result)) - - # Then write the (empty) result note to signify it's ready. - data = {'result_data_path': result_data_path} - self.kazoo_client.create(merge_request.result_path, - self._dictToBytes(data)) - - def get(self, path): - """Get a merge request - - Note: do not mix get with iteration; iteration returns cached - MergeRequests while get returns a newly created object each time. If - you lock a MergeRequest, you must use the same object to unlock it. - - """ - try: - data, zstat = self.kazoo_client.get(path) - except NoNodeError: - return None - - if not data: - return None - - content = self._bytesToDict(data) - - merge_request = MergeRequest.fromDict(content) - merge_request.path = path - merge_request._zstat = zstat - - return merge_request - - def remove(self, merge_request): - self.log.debug("Removing merge request %s", merge_request) - try: - self.kazoo_client.delete(merge_request.path, recursive=True) - except NoNodeError: - # Nothing to do if the node is already deleted - pass - self.clearMergeParams(merge_request) - try: - # Delete the lock parent node as well - path = "/".join([self.LOCK_ROOT, merge_request.uuid]) - self.kazoo_client.delete(path, recursive=True) - except NoNodeError: - pass - - def lock(self, merge_request, blocking=True, timeout=None): - path = "/".join([self.LOCK_ROOT, merge_request.uuid]) - have_lock = False - lock = None - try: - lock = Lock(self.kazoo_client, path) - have_lock = lock.acquire(blocking, timeout) - except LockTimeout: - have_lock = False - self.log.error( - "Timeout trying to acquire lock: %s", merge_request.uuid - ) - - # If we aren't blocking, it's possible we didn't get the lock - # because someone else has it. - if not have_lock: - return False - - if not self.kazoo_client.exists(merge_request.path): - lock.release() - self.log.error( - "Merge not found for locking: %s", merge_request.uuid - ) - - # We may have just re-created the lock parent node just after the - # scheduler deleted it; therefore we should (re-) delete it. - try: - # Delete the lock parent node as well. - path = "/".join([self.LOCK_ROOT, merge_request.uuid]) - self.kazoo_client.delete(path, recursive=True) - except NoNodeError: - pass - - return False - - merge_request.lock = lock - - return True - - def unlock(self, merge_request): - if merge_request.lock is None: - self.log.warning( - "MergeRequest %s does not hold a lock", merge_request - ) - else: - merge_request.lock.release() - merge_request.lock = None - - def isLocked(self, merge_request): - path = "/".join([self.LOCK_ROOT, merge_request.uuid]) - lock = Lock(self.kazoo_client, path) - is_locked = len(lock.contenders()) > 0 - return is_locked - - def lostMergeRequests(self): - # Get a list of merge requests which are running but not locked by any - # merger. - yield from filter( - lambda b: not self.isLocked(b), - self.inState(MergeRequest.RUNNING), - ) - - def _getAllRequestIds(self): - # Get a list of all request ids without using the cache. - return self.kazoo_client.get_children(self.MERGE_REQUEST_ROOT) - - def _findLostParams(self, age): - # Get data nodes which are older than the specified age (we - # don't want to delete nodes which are just being written - # slowly). - # Convert to MS - now = int(time.time() * 1000) - age = age * 1000 - data_nodes = dict() - for data_id in self.kazoo_client.get_children(self.MERGE_PARAMS_ROOT): - data_path = self._getParamsPath(data_id) - data_zstat = self.kazoo_client.exists(data_path) - if now - data_zstat.mtime > age: - data_nodes[data_id] = data_path - - # If there are no candidate data nodes, we don't need to - # filter them by known requests. - if not data_nodes: - return data_nodes.values() - - # Remove current request uuids - for request_id in self._getAllRequestIds(): - if request_id in data_nodes: - del data_nodes[request_id] - - # Return the paths - return data_nodes.values() - - def _findLostMergeResults(self): - # Get a list of merge results which don't have a connection waiting for - # them. As the results and waiters are not part of our cache, we have - # to look them up directly from ZK. - waiters1 = set(self.kazoo_client.get_children(self.MERGE_WAITER_ROOT)) - results = set(self.kazoo_client.get_children(self.MERGE_RESULT_ROOT)) - result_data = set(self.kazoo_client.get_children( - self.MERGE_RESULT_DATA_ROOT)) - waiters2 = set(self.kazoo_client.get_children(self.MERGE_WAITER_ROOT)) - - waiters = waiters1.union(waiters2) - lost_results = results - waiters - lost_data = result_data - waiters - return lost_results, lost_data - - def cleanup(self, age=300): - # Delete build request params which are not associated with - # any current build requests. Note, this does not clean up - # lost build requests themselves; the merger client takes - # care of that. - try: - for path in self._findLostParams(age): - try: - self.log.error("Removing merge request params: %s", path) - self.kazoo_client.delete(path, recursive=True) - except Exception: - self.log.execption( - "Unable to delete merge request params %s", path) - except Exception: - self.log.exception( - "Error cleaning up merge request queue %s", self) - try: - lost_results, lost_data = self._findLostMergeResults() - for result_id in lost_results: - try: - path = '/'.join([self.MERGE_RESULT_ROOT, result_id]) - self.log.error("Removing merge request result: %s", path) - self.kazoo_client.delete(path, recursive=True) - except Exception: - self.log.execption( - "Unable to delete merge request params %s", result_id) - for result_id in lost_data: - try: - path = '/'.join([self.MERGE_RESULT_DATA_ROOT, result_id]) - self.log.error( - "Removing merge request result data: %s", path) - self.kazoo_client.delete(path, recursive=True) - except Exception: - self.log.execption( - "Unable to delete merge request params %s", result_id) - except Exception: - self.log.exception( - "Error cleaning up merge result queue %s", self) - - @staticmethod - def _bytesToDict(data): - return json.loads(data.decode("utf-8")) - - @staticmethod - def _dictToBytes(data): - # The custom json_dumps() will also serialize MappingProxyType objects - return json_dumps(data).encode("utf-8") - - def _getParamsPath(self, uuid): - return '/'.join([self.MERGE_PARAMS_ROOT, uuid]) - - def clearMergeParams(self, merge_request): - """Erase the merge parameters from ZK to save space""" - self.kazoo_client.delete(self._getParamsPath(merge_request.uuid), - recursive=True) - - def getMergeParams(self, merge_request): - """Return the parameters for a merge request, if they exist. - - Once a merge request is accepted by an executor, the params - may be erased from ZK; this will return None in that case. - - """ - with sharding.BufferedShardReader( - self.kazoo_client, self._getParamsPath(merge_request.uuid) - ) as stream: - data = stream.read() - if not data: - return None - return self._bytesToDict(data) + root = '/zuul/merger' + super().__init__(client, root, merge_request_callback)