Browse Source

Merge "Fix missing semaphore release around dequeue"

tags/3.6.0
Zuul 2 months ago
parent
commit
ec5af360f2
2 changed files with 95 additions and 3 deletions
  1. 82
    0
      tests/unit/test_scheduler.py
  2. 13
    3
      zuul/manager/__init__.py

+ 82
- 0
tests/unit/test_scheduler.py View File

@@ -6422,6 +6422,88 @@ class TestSemaphore(ZuulTestCase):
6422 6422
         self.executor_server.release()
6423 6423
         self.waitUntilSettled()
6424 6424
 
6425
+    def test_semaphore_abandon_pending_node_request(self):
6426
+        "Test abandon with job semaphores and pending node request"
6427
+        self.executor_server.hold_jobs_in_build = True
6428
+
6429
+        # Pause nodepool so we can check the ordering of getting the nodes
6430
+        # and aquiring the semaphore.
6431
+        self.fake_nodepool.paused = True
6432
+
6433
+        tenant = self.sched.abide.tenants.get('tenant-one')
6434
+        check_pipeline = tenant.layout.pipelines['check']
6435
+
6436
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
6437
+        self.assertFalse('test-semaphore' in
6438
+                         tenant.semaphore_handler.semaphores)
6439
+
6440
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
6441
+        self.waitUntilSettled()
6442
+
6443
+        self.assertTrue('test-semaphore' in
6444
+                        tenant.semaphore_handler.semaphores)
6445
+
6446
+        self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
6447
+        self.waitUntilSettled()
6448
+
6449
+        # The check pipeline should be empty
6450
+        items = check_pipeline.getAllItems()
6451
+        self.assertEqual(len(items), 0)
6452
+
6453
+        # The semaphore should be released
6454
+        self.assertFalse('test-semaphore' in
6455
+                         tenant.semaphore_handler.semaphores)
6456
+
6457
+        self.executor_server.hold_jobs_in_build = False
6458
+        self.fake_nodepool.paused = False
6459
+        self.executor_server.release()
6460
+        self.waitUntilSettled()
6461
+
6462
+    def test_semaphore_abandon_pending_execution(self):
6463
+        "Test abandon with job semaphores and pending job execution"
6464
+
6465
+        # Pause the executor so it doesn't take any jobs.
6466
+        self.executor_server.pause()
6467
+
6468
+        # Pause nodepool so we can wait on the node requests and fulfill them
6469
+        # in a controlled manner.
6470
+        self.fake_nodepool.paused = True
6471
+
6472
+        tenant = self.sched.abide.tenants.get('tenant-one')
6473
+        check_pipeline = tenant.layout.pipelines['check']
6474
+
6475
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
6476
+        self.assertFalse('test-semaphore' in
6477
+                         tenant.semaphore_handler.semaphores)
6478
+
6479
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
6480
+        self.waitUntilSettled()
6481
+        self.assertEqual(len(self.nodepool.requests), 2)
6482
+
6483
+        # Now unpause nodepool to fulfill the node requests. We cannot use
6484
+        # waitUntilSettled here because the executor is paused.
6485
+        self.fake_nodepool.paused = False
6486
+        for _ in iterate_timeout(30, 'fulfill node requests'):
6487
+            if len(self.nodepool.requests) == 0:
6488
+                break
6489
+
6490
+        self.assertTrue('test-semaphore' in
6491
+                        tenant.semaphore_handler.semaphores)
6492
+
6493
+        self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
6494
+        self.waitUntilSettled()
6495
+
6496
+        # The check pipeline should be empty
6497
+        items = check_pipeline.getAllItems()
6498
+        self.assertEqual(len(items), 0)
6499
+
6500
+        # The semaphore should be released
6501
+        self.assertFalse('test-semaphore' in
6502
+                         tenant.semaphore_handler.semaphores)
6503
+
6504
+        self.executor_server.release()
6505
+        self.waitUntilSettled()
6506
+
6425 6507
     def test_semaphore_new_patchset(self):
6426 6508
         "Test new patchset with job semaphores"
6427 6509
         self.executor_server.hold_jobs_in_build = True

+ 13
- 3
zuul/manager/__init__.py View File

@@ -420,11 +420,16 @@ class PipelineManager(object):
420 420
     def cancelJobs(self, item, prime=True):
421 421
         self.log.debug("Cancel jobs for change %s" % item.change)
422 422
         canceled = False
423
+        jobs_to_release = []
424
+
423 425
         old_build_set = item.current_build_set
426
+        old_jobs = {job.name: job for job in item.getJobs()}
427
+
424 428
         if prime and item.current_build_set.ref:
425 429
             item.resetAllBuilds()
426 430
         for req in old_build_set.node_requests.values():
427 431
             self.sched.nodepool.cancelRequest(req)
432
+            jobs_to_release.append(req.job)
428 433
         old_build_set.node_requests = {}
429 434
         canceled_jobs = set()
430 435
         for build in old_build_set.getBuilds():
@@ -437,9 +442,7 @@ class PipelineManager(object):
437 442
             except Exception:
438 443
                 self.log.exception("Exception while canceling build %s "
439 444
                                    "for change %s" % (build, item.change))
440
-            tenant = old_build_set.item.pipeline.tenant
441
-            tenant.semaphore_handler.release(
442
-                old_build_set.item, build.job)
445
+            jobs_to_release.append(build.job)
443 446
 
444 447
             if not was_running:
445 448
                 nodeset = build.build_set.getJobNodeSet(build.job.name)
@@ -451,6 +454,13 @@ class PipelineManager(object):
451 454
             if jobname in canceled_jobs:
452 455
                 continue
453 456
             self.sched.nodepool.returnNodeSet(nodeset)
457
+            jobs_to_release.append(old_jobs[jobname])
458
+
459
+        for job in jobs_to_release:
460
+            tenant = old_build_set.item.pipeline.tenant
461
+            tenant.semaphore_handler.release(
462
+                old_build_set.item, job)
463
+
454 464
         for item_behind in item.items_behind:
455 465
             self.log.debug("Canceling jobs for change %s, behind change %s" %
456 466
                            (item_behind.change, item.change))

Loading…
Cancel
Save