Browse Source

Merge "Set relative priority of node requests"

tags/3.4.0
Zuul 9 months ago
parent
commit
16c55fa267

+ 26
- 0
doc/source/admin/components.rst View File

@@ -276,6 +276,32 @@ The following sections of ``zuul.conf`` are used by the scheduler:
276 276
 
277 277
       Path to directory in which Zuul should save its state.
278 278
 
279
+   .. attr:: relative_priority
280
+      :default: False
281
+
282
+      A boolean which indicates whether the scheduler should supply
283
+      relative priority information for node requests.
284
+
285
+      In all cases, each pipeline may specify a precedence value which
286
+      is used by Nodepool to satisfy requests from higher-precedence
287
+      pipelines first.  If ``relative_priority`` is set to ``True``,
288
+      then Zuul will additionally group items in the same pipeline by
289
+      project and weight each request by its position in that
290
+      project's group.  A request for the first change of a given
291
+      project will have the highest relative priority, and the second
292
+      change a lower relative priority.  The first change of each
293
+      project in a pipeline has the same relative priority, regardless
294
+      of the order of submission or how many other changes are in the
295
+      pipeline.  This can be used to make node allocations complete
296
+      faster for projects with fewer changes in a system dominated by
297
+      projects with more changes.
298
+
299
+      If this value is ``False`` (the default), then node requests are
300
+      sorted by pipeline precedence followed by the order in which
301
+      they were submitted.  If this is ``True``, they are sorted by
302
+      pipeline precedence, followed by relative priority, and finally
303
+      the order in which they were submitted.
304
+
279 305
 Operation
280 306
 ~~~~~~~~~
281 307
 

+ 6
- 0
releasenotes/notes/relative_priority-dee014da5977da36.yaml View File

@@ -0,0 +1,6 @@
1
+---
2
+features:
3
+  - |
4
+    A new scheduler option, :attr:`scheduler.relative_priority`, can
5
+    be used to instruct Nodepool to fulfull requests from less-busy
6
+    projects more quickly.

+ 14
- 1
tests/base.py View File

@@ -1737,6 +1737,7 @@ class FakeNodepool(object):
1737 1737
     log = logging.getLogger("zuul.test.FakeNodepool")
1738 1738
 
1739 1739
     def __init__(self, host, port, chroot):
1740
+        self.complete_event = threading.Event()
1740 1741
         self.host_keys = None
1741 1742
         self.client = kazoo.client.KazooClient(
1742 1743
             hosts='%s:%s%s' % (host, port, chroot))
@@ -1756,12 +1757,21 @@ class FakeNodepool(object):
1756 1757
         self.client.stop()
1757 1758
         self.client.close()
1758 1759
 
1760
+    def pause(self):
1761
+        self.complete_event.wait()
1762
+        self.paused = True
1763
+
1764
+    def unpause(self):
1765
+        self.paused = False
1766
+
1759 1767
     def run(self):
1760 1768
         while self._running:
1769
+            self.complete_event.clear()
1761 1770
             try:
1762 1771
                 self._run()
1763 1772
             except Exception:
1764 1773
                 self.log.exception("Error in fake nodepool:")
1774
+            self.complete_event.set()
1765 1775
             time.sleep(0.1)
1766 1776
 
1767 1777
     def _run(self):
@@ -1776,7 +1786,7 @@ class FakeNodepool(object):
1776 1786
         except kazoo.exceptions.NoNodeError:
1777 1787
             return []
1778 1788
         reqs = []
1779
-        for oid in sorted(reqids):
1789
+        for oid in reqids:
1780 1790
             path = self.REQUEST_ROOT + '/' + oid
1781 1791
             try:
1782 1792
                 data, stat = self.client.get(path)
@@ -1785,6 +1795,9 @@ class FakeNodepool(object):
1785 1795
                 reqs.append(data)
1786 1796
             except kazoo.exceptions.NoNodeError:
1787 1797
                 pass
1798
+        reqs.sort(key=lambda r: (r['_oid'].split('-')[0],
1799
+                                 r['relative_priority'],
1800
+                                 r['_oid'].split('-')[1]))
1788 1801
         return reqs
1789 1802
 
1790 1803
     def getNodes(self):

+ 79
- 0
tests/fixtures/layouts/two-projects-integrated.yaml View File

@@ -0,0 +1,79 @@
1
+- pipeline:
2
+    name: check
3
+    manager: independent
4
+    trigger:
5
+      gerrit:
6
+        - event: patchset-created
7
+    success:
8
+      gerrit:
9
+        Verified: 1
10
+    failure:
11
+      gerrit:
12
+        Verified: -1
13
+
14
+- pipeline:
15
+    name: gate
16
+    manager: dependent
17
+    success-message: Build succeeded (gate).
18
+    trigger:
19
+      gerrit:
20
+        - event: comment-added
21
+          approval:
22
+            - Approved: 1
23
+    success:
24
+      gerrit:
25
+        Verified: 2
26
+        submit: true
27
+    failure:
28
+      gerrit:
29
+        Verified: -2
30
+    start:
31
+      gerrit:
32
+        Verified: 0
33
+    precedence: high
34
+
35
+- job:
36
+    name: base
37
+    parent: null
38
+    run: playbooks/base.yaml
39
+    nodeset:
40
+      nodes:
41
+        - name: controller
42
+          label: ubuntu-xenial
43
+
44
+- job:
45
+    name: test
46
+    run: playbooks/test.yaml
47
+
48
+- job:
49
+    name: integration
50
+    run: playbooks/integration.yaml
51
+
52
+- project:
53
+    name: org/project
54
+    check:
55
+      jobs:
56
+        - test
57
+    gate:
58
+      jobs:
59
+        - test
60
+
61
+- project:
62
+    name: org/project1
63
+    check:
64
+      jobs:
65
+        - integration
66
+    gate:
67
+      queue: integrated
68
+      jobs:
69
+        - integration
70
+
71
+- project:
72
+    name: org/project2
73
+    check:
74
+      jobs:
75
+        - integration
76
+    gate:
77
+      queue: integrated
78
+      jobs:
79
+        - integration

+ 1
- 0
tests/fixtures/zuul.conf View File

@@ -8,6 +8,7 @@ server=127.0.0.1
8 8
 
9 9
 [scheduler]
10 10
 tenant_config=main.yaml
11
+relative_priority=true
11 12
 
12 13
 [merger]
13 14
 git_dir=/tmp/zuul-test/merger-git

+ 4
- 4
tests/nodepool/test_nodepool_integration.py View File

@@ -58,7 +58,7 @@ class TestNodepoolIntegration(BaseTestCase):
58 58
         nodeset.addNode(model.Node(['controller'], 'fake-label'))
59 59
         job = model.Job('testjob')
60 60
         job.nodeset = nodeset
61
-        request = self.nodepool.requestNodes(None, job)
61
+        request = self.nodepool.requestNodes(None, job, 0)
62 62
         self.waitForRequests()
63 63
         self.assertEqual(len(self.provisioned_requests), 1)
64 64
         self.assertEqual(request.state, model.STATE_FULFILLED)
@@ -88,7 +88,7 @@ class TestNodepoolIntegration(BaseTestCase):
88 88
         nodeset.addNode(model.Node(['controller'], 'invalid-label'))
89 89
         job = model.Job('testjob')
90 90
         job.nodeset = nodeset
91
-        request = self.nodepool.requestNodes(None, job)
91
+        request = self.nodepool.requestNodes(None, job, 0)
92 92
         self.waitForRequests()
93 93
         self.assertEqual(len(self.provisioned_requests), 1)
94 94
         self.assertEqual(request.state, model.STATE_FAILED)
@@ -103,7 +103,7 @@ class TestNodepoolIntegration(BaseTestCase):
103 103
         job = model.Job('testjob')
104 104
         job.nodeset = nodeset
105 105
         self.fake_nodepool.paused = True
106
-        request = self.nodepool.requestNodes(None, job)
106
+        request = self.nodepool.requestNodes(None, job, 0)
107 107
         self.zk.client.stop()
108 108
         self.zk.client.start()
109 109
         self.fake_nodepool.paused = False
@@ -121,7 +121,7 @@ class TestNodepoolIntegration(BaseTestCase):
121 121
         job = model.Job('testjob')
122 122
         job.nodeset = nodeset
123 123
         self.fake_nodepool.paused = True
124
-        request = self.nodepool.requestNodes(None, job)
124
+        request = self.nodepool.requestNodes(None, job, 0)
125 125
         self.nodepool.cancelRequest(request)
126 126
 
127 127
         self.waitForRequests()

+ 26
- 8
tests/unit/test_nodepool.py View File

@@ -71,7 +71,7 @@ class TestNodepool(BaseTestCase):
71 71
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
72 72
         job = model.Job('testjob')
73 73
         job.nodeset = nodeset
74
-        request = self.nodepool.requestNodes(None, job)
74
+        request = self.nodepool.requestNodes(None, job, 0)
75 75
         self.waitForRequests()
76 76
         self.assertEqual(len(self.provisioned_requests), 1)
77 77
         self.assertEqual(request.state, 'fulfilled')
@@ -103,11 +103,11 @@ class TestNodepool(BaseTestCase):
103 103
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
104 104
         job = model.Job('testjob')
105 105
         job.nodeset = nodeset
106
-        self.fake_nodepool.paused = True
107
-        request = self.nodepool.requestNodes(None, job)
106
+        self.fake_nodepool.pause()
107
+        request = self.nodepool.requestNodes(None, job, 0)
108 108
         self.zk.client.stop()
109 109
         self.zk.client.start()
110
-        self.fake_nodepool.paused = False
110
+        self.fake_nodepool.unpause()
111 111
         self.waitForRequests()
112 112
         self.assertEqual(len(self.provisioned_requests), 1)
113 113
         self.assertEqual(request.state, 'fulfilled')
@@ -120,8 +120,8 @@ class TestNodepool(BaseTestCase):
120 120
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
121 121
         job = model.Job('testjob')
122 122
         job.nodeset = nodeset
123
-        self.fake_nodepool.paused = True
124
-        request = self.nodepool.requestNodes(None, job)
123
+        self.fake_nodepool.pause()
124
+        request = self.nodepool.requestNodes(None, job, 0)
125 125
         self.nodepool.cancelRequest(request)
126 126
 
127 127
         self.waitForRequests()
@@ -135,7 +135,7 @@ class TestNodepool(BaseTestCase):
135 135
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
136 136
         job = model.Job('testjob')
137 137
         job.nodeset = nodeset
138
-        request = self.nodepool.requestNodes(None, job)
138
+        request = self.nodepool.requestNodes(None, job, 0)
139 139
         self.waitForRequests()
140 140
         self.assertEqual(len(self.provisioned_requests), 1)
141 141
         self.assertEqual(request.state, 'fulfilled')
@@ -156,7 +156,7 @@ class TestNodepool(BaseTestCase):
156 156
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
157 157
         job = model.Job('testjob')
158 158
         job.nodeset = nodeset
159
-        request = self.nodepool.requestNodes(None, job)
159
+        request = self.nodepool.requestNodes(None, job, 0)
160 160
         self.waitForRequests()
161 161
         self.assertEqual(len(self.provisioned_requests), 1)
162 162
         self.assertEqual(request.state, 'fulfilled')
@@ -170,3 +170,21 @@ class TestNodepool(BaseTestCase):
170 170
         for node in nodeset.getNodes():
171 171
             self.assertIsNone(node.lock)
172 172
             self.assertEqual(node.state, 'ready')
173
+
174
+    def test_node_request_priority(self):
175
+        # Test that requests are satisfied in priority order
176
+
177
+        nodeset = model.NodeSet()
178
+        nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
179
+        nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
180
+        job = model.Job('testjob')
181
+        job.nodeset = nodeset
182
+        self.fake_nodepool.pause()
183
+        request1 = self.nodepool.requestNodes(None, job, 1)
184
+        request2 = self.nodepool.requestNodes(None, job, 0)
185
+        self.fake_nodepool.unpause()
186
+        self.waitForRequests()
187
+        self.assertEqual(len(self.provisioned_requests), 2)
188
+        self.assertEqual(request1.state, 'fulfilled')
189
+        self.assertEqual(request2.state, 'fulfilled')
190
+        self.assertTrue(request2.state_time < request1.state_time)

+ 97
- 16
tests/unit/test_scheduler.py View File

@@ -5025,7 +5025,7 @@ For CI problems and help debugging, contact ci@example.org"""
5025 5025
     def test_zookeeper_disconnect(self):
5026 5026
         "Test that jobs are executed after a zookeeper disconnect"
5027 5027
 
5028
-        self.fake_nodepool.paused = True
5028
+        self.fake_nodepool.pause()
5029 5029
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5030 5030
         A.addApproval('Code-Review', 2)
5031 5031
         self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5033,7 +5033,7 @@ For CI problems and help debugging, contact ci@example.org"""
5033 5033
 
5034 5034
         self.zk.client.stop()
5035 5035
         self.zk.client.start()
5036
-        self.fake_nodepool.paused = False
5036
+        self.fake_nodepool.unpause()
5037 5037
         self.waitUntilSettled()
5038 5038
 
5039 5039
         self.assertEqual(A.data['status'], 'MERGED')
@@ -5044,7 +5044,7 @@ For CI problems and help debugging, contact ci@example.org"""
5044 5044
 
5045 5045
         # This tests receiving a ZK disconnect between the arrival of
5046 5046
         # a fulfilled request and when we accept its nodes.
5047
-        self.fake_nodepool.paused = True
5047
+        self.fake_nodepool.pause()
5048 5048
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5049 5049
         A.addApproval('Code-Review', 2)
5050 5050
         self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5056,7 +5056,7 @@ For CI problems and help debugging, contact ci@example.org"""
5056 5056
         self.sched.run_handler_lock.acquire()
5057 5057
 
5058 5058
         # Fulfill the nodepool request.
5059
-        self.fake_nodepool.paused = False
5059
+        self.fake_nodepool.unpause()
5060 5060
         requests = list(self.sched.nodepool.requests.values())
5061 5061
         self.assertEqual(1, len(requests))
5062 5062
         request = requests[0]
@@ -5090,7 +5090,7 @@ For CI problems and help debugging, contact ci@example.org"""
5090 5090
     def test_nodepool_failure(self):
5091 5091
         "Test that jobs are reported after a nodepool failure"
5092 5092
 
5093
-        self.fake_nodepool.paused = True
5093
+        self.fake_nodepool.pause()
5094 5094
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5095 5095
         A.addApproval('Code-Review', 2)
5096 5096
         self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5099,7 +5099,7 @@ For CI problems and help debugging, contact ci@example.org"""
5099 5099
         req = self.fake_nodepool.getNodeRequests()[0]
5100 5100
         self.fake_nodepool.addFailRequest(req)
5101 5101
 
5102
-        self.fake_nodepool.paused = False
5102
+        self.fake_nodepool.unpause()
5103 5103
         self.waitUntilSettled()
5104 5104
 
5105 5105
         self.assertEqual(A.data['status'], 'NEW')
@@ -5108,10 +5108,10 @@ For CI problems and help debugging, contact ci@example.org"""
5108 5108
         self.assertIn('project-test1 : SKIPPED', A.messages[1])
5109 5109
         self.assertIn('project-test2 : SKIPPED', A.messages[1])
5110 5110
 
5111
-    def test_nodepool_priority(self):
5112
-        "Test that nodes are requested at the correct priority"
5111
+    def test_nodepool_pipeline_priority(self):
5112
+        "Test that nodes are requested at the correct pipeline priority"
5113 5113
 
5114
-        self.fake_nodepool.paused = True
5114
+        self.fake_nodepool.pause()
5115 5115
 
5116 5116
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5117 5117
         self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
@@ -5128,10 +5128,11 @@ For CI problems and help debugging, contact ci@example.org"""
5128 5128
 
5129 5129
         reqs = self.fake_nodepool.getNodeRequests()
5130 5130
 
5131
-        # The requests come back sorted by oid. Since we have three requests
5132
-        # for the three changes each with a different priority.
5133
-        # Also they get a serial number based on order they were received
5134
-        # so the number on the endof the oid should map to order submitted.
5131
+        # The requests come back sorted by priority. Since we have
5132
+        # three requests for the three changes each with a different
5133
+        # priority.  Also they get a serial number based on order they
5134
+        # were received so the number on the endof the oid should map
5135
+        # to order submitted.
5135 5136
 
5136 5137
         # * gate first - high priority - change C
5137 5138
         self.assertEqual(reqs[0]['_oid'], '100-0000000002')
@@ -5145,13 +5146,93 @@ For CI problems and help debugging, contact ci@example.org"""
5145 5146
         self.assertEqual(reqs[2]['_oid'], '300-0000000000')
5146 5147
         self.assertEqual(reqs[2]['node_types'], ['ubuntu-xenial'])
5147 5148
 
5148
-        self.fake_nodepool.paused = False
5149
+        self.fake_nodepool.unpause()
5150
+        self.waitUntilSettled()
5151
+
5152
+    def test_nodepool_relative_priority_check(self):
5153
+        "Test that nodes are requested at the relative priority"
5154
+
5155
+        self.fake_nodepool.pause()
5156
+
5157
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5158
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
5159
+        self.waitUntilSettled()
5160
+
5161
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
5162
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
5163
+        self.waitUntilSettled()
5164
+
5165
+        C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C')
5166
+        self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
5167
+        self.waitUntilSettled()
5168
+
5169
+        reqs = self.fake_nodepool.getNodeRequests()
5170
+
5171
+        # The requests come back sorted by priority.
5172
+
5173
+        # Change A, first change for project, high relative priority.
5174
+        self.assertEqual(reqs[0]['_oid'], '200-0000000000')
5175
+        self.assertEqual(reqs[0]['relative_priority'], 0)
5176
+
5177
+        # Change C, first change for project1, high relative priority.
5178
+        self.assertEqual(reqs[1]['_oid'], '200-0000000002')
5179
+        self.assertEqual(reqs[1]['relative_priority'], 0)
5180
+
5181
+        # Change B, second change for project, lower relative priority.
5182
+        self.assertEqual(reqs[2]['_oid'], '200-0000000001')
5183
+        self.assertEqual(reqs[2]['relative_priority'], 1)
5184
+
5185
+        self.fake_nodepool.unpause()
5186
+        self.waitUntilSettled()
5187
+
5188
+    @simple_layout('layouts/two-projects-integrated.yaml')
5189
+    def test_nodepool_relative_priority_gate(self):
5190
+        "Test that nodes are requested at the relative priority"
5191
+
5192
+        self.fake_nodepool.pause()
5193
+
5194
+        A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
5195
+        A.addApproval('Code-Review', 2)
5196
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
5197
+        self.waitUntilSettled()
5198
+
5199
+        B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
5200
+        B.addApproval('Code-Review', 2)
5201
+        self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
5202
+        self.waitUntilSettled()
5203
+
5204
+        # project does not share a queue with project1 and project2.
5205
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
5206
+        C.addApproval('Code-Review', 2)
5207
+        self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
5208
+        self.waitUntilSettled()
5209
+
5210
+        reqs = self.fake_nodepool.getNodeRequests()
5211
+
5212
+        # The requests come back sorted by priority.
5213
+
5214
+        # Change A, first change for shared queue, high relative
5215
+        # priority.
5216
+        self.assertEqual(reqs[0]['_oid'], '100-0000000000')
5217
+        self.assertEqual(reqs[0]['relative_priority'], 0)
5218
+
5219
+        # Change C, first change for independent project, high
5220
+        # relative priority.
5221
+        self.assertEqual(reqs[1]['_oid'], '100-0000000002')
5222
+        self.assertEqual(reqs[1]['relative_priority'], 0)
5223
+
5224
+        # Change B, second change for shared queue, lower relative
5225
+        # priority.
5226
+        self.assertEqual(reqs[2]['_oid'], '100-0000000001')
5227
+        self.assertEqual(reqs[2]['relative_priority'], 1)
5228
+
5229
+        self.fake_nodepool.unpause()
5149 5230
         self.waitUntilSettled()
5150 5231
 
5151 5232
     def test_nodepool_job_removal(self):
5152 5233
         "Test that nodes are returned unused after job removal"
5153 5234
 
5154
-        self.fake_nodepool.paused = True
5235
+        self.fake_nodepool.pause()
5155 5236
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5156 5237
         A.addApproval('Code-Review', 2)
5157 5238
         self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5161,7 +5242,7 @@ For CI problems and help debugging, contact ci@example.org"""
5161 5242
         self.sched.reconfigure(self.config)
5162 5243
         self.waitUntilSettled()
5163 5244
 
5164
-        self.fake_nodepool.paused = False
5245
+        self.fake_nodepool.unpause()
5165 5246
         self.waitUntilSettled()
5166 5247
 
5167 5248
         self.assertEqual(A.data['status'], 'MERGED')

+ 16
- 1
zuul/manager/__init__.py View File

@@ -85,6 +85,11 @@ class PipelineManager(object):
85 85
                 return True
86 86
         return False
87 87
 
88
+    def getNodePriority(self, item):
89
+        items = self.pipeline.getAllItems()
90
+        items = [i for i in items if i.change.project == item.change.project]
91
+        return items.index(item)
92
+
88 93
     def isChangeAlreadyInPipeline(self, change):
89 94
         # Checks live items in the pipeline
90 95
         for item in self.pipeline.getAllItems():
@@ -327,8 +332,12 @@ class PipelineManager(object):
327 332
             return False
328 333
         build_set = item.current_build_set
329 334
         self.log.debug("Requesting nodes for change %s" % item.change)
335
+        if self.sched.use_relative_priority:
336
+            priority = item.getNodePriority()
337
+        else:
338
+            priority = 0
330 339
         for job in jobs:
331
-            req = self.sched.nodepool.requestNodes(build_set, job)
340
+            req = self.sched.nodepool.requestNodes(build_set, job, priority)
332 341
             self.log.debug("Adding node request %s for job %s to item %s" %
333 342
                            (req, job, item))
334 343
             build_set.setJobNodeRequest(job.name, req)
@@ -687,6 +696,12 @@ class PipelineManager(object):
687 696
         if failing_reasons:
688 697
             self.log.debug("%s is a failing item because %s" %
689 698
                            (item, failing_reasons))
699
+        if not dequeued and self.sched.use_relative_priority:
700
+            priority = item.getNodePriority()
701
+            for node_request in item.current_build_set.node_requests.values():
702
+                if node_request.relative_priority != priority:
703
+                    self.sched.nodepool.reviseNodeRequest(
704
+                        node_request, priority)
690 705
         return (changed, nnfi)
691 706
 
692 707
     def processQueue(self):

+ 5
- 0
zuul/manager/dependent.py View File

@@ -93,6 +93,11 @@ class DependentPipelineManager(PipelineManager):
93 93
             self.log.debug("Dynamically created queue %s", change_queue)
94 94
             return DynamicChangeQueueContextManager(change_queue)
95 95
 
96
+    def getNodePriority(self, item):
97
+        with self.getChangeQueue(item.change) as change_queue:
98
+            items = change_queue.queue
99
+            return items.index(item)
100
+
96 101
     def isChangeReadyToBeEnqueued(self, change):
97 102
         source = change.project.source
98 103
         if not source.canMerge(change, self.getSubmitAllowNeeds()):

+ 17
- 4
zuul/model.py View File

@@ -688,7 +688,7 @@ class NodeSet(ConfigObject):
688 688
 class NodeRequest(object):
689 689
     """A request for a set of nodes."""
690 690
 
691
-    def __init__(self, requestor, build_set, job, nodeset):
691
+    def __init__(self, requestor, build_set, job, nodeset, relative_priority):
692 692
         self.requestor = requestor
693 693
         self.build_set = build_set
694 694
         self.job = job
@@ -696,9 +696,12 @@ class NodeRequest(object):
696 696
         self._state = STATE_REQUESTED
697 697
         self.requested_time = time.time()
698 698
         self.state_time = time.time()
699
+        self.created_time = None
699 700
         self.stat = None
700 701
         self.uid = uuid4().hex
702
+        self.relative_priority = relative_priority
701 703
         self.id = None
704
+        self._zk_data = {}  # Data that we read back from ZK
702 705
         # Zuul internal flags (not stored in ZK so they are not
703 706
         # overwritten).
704 707
         self.failed = False
@@ -731,17 +734,24 @@ class NodeRequest(object):
731 734
         return '<NodeRequest %s %s>' % (self.id, self.nodeset)
732 735
 
733 736
     def toDict(self):
734
-        d = {}
737
+        # Start with any previously read data
738
+        d = self._zk_data.copy()
735 739
         nodes = [n.label for n in self.nodeset.getNodes()]
736
-        d['node_types'] = nodes
737
-        d['requestor'] = self.requestor
740
+        # These are immutable once set
741
+        d.setdefault('node_types', nodes)
742
+        d.setdefault('requestor', self.requestor)
743
+        d.setdefault('created_time', self.created_time)
744
+        # We might change these
738 745
         d['state'] = self.state
739 746
         d['state_time'] = self.state_time
747
+        d['relative_priority'] = self.relative_priority
740 748
         return d
741 749
 
742 750
     def updateFromDict(self, data):
751
+        self._zk_data = data
743 752
         self._state = data['state']
744 753
         self.state_time = data['state_time']
754
+        self.relative_priority = data['relative_priority']
745 755
 
746 756
 
747 757
 class Secret(ConfigObject):
@@ -2268,6 +2278,9 @@ class QueueItem(object):
2268 2278
             fakebuild.result = 'SKIPPED'
2269 2279
             self.addBuild(fakebuild)
2270 2280
 
2281
+    def getNodePriority(self):
2282
+        return self.pipeline.manager.getNodePriority(self)
2283
+
2271 2284
     def formatUrlPattern(self, url_pattern, job=None, build=None):
2272 2285
         url = None
2273 2286
         # Produce safe versions of objects which may be useful in

+ 36
- 2
zuul/nodepool.py View File

@@ -13,6 +13,7 @@
13 13
 import logging
14 14
 
15 15
 from zuul import model
16
+from zuul.zk import LockException
16 17
 
17 18
 
18 19
 class Nodepool(object):
@@ -51,11 +52,12 @@ class Nodepool(object):
51 52
             statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
52 53
         statsd.gauge('zuul.nodepool.current_requests', len(self.requests))
53 54
 
54
-    def requestNodes(self, build_set, job):
55
+    def requestNodes(self, build_set, job, relative_priority):
55 56
         # Create a copy of the nodeset to represent the actual nodes
56 57
         # returned by nodepool.
57 58
         nodeset = job.nodeset.copy()
58
-        req = model.NodeRequest(self.sched.hostname, build_set, job, nodeset)
59
+        req = model.NodeRequest(self.sched.hostname, build_set, job,
60
+                                nodeset, relative_priority)
59 61
         self.requests[req.uid] = req
60 62
 
61 63
         if nodeset.nodes:
@@ -79,6 +81,38 @@ class Nodepool(object):
79 81
             except Exception:
80 82
                 self.log.exception("Error deleting node request:")
81 83
 
84
+    def reviseRequest(self, request, relative_priority=None):
85
+        '''Attempt to update the node request, if it is not currently being
86
+        processed.
87
+
88
+        :param: NodeRequest request: The request to update.
89
+        :param relative_priority int: If supplied, the new relative
90
+            priority to set on the request.
91
+
92
+        '''
93
+        if relative_priority is None:
94
+            return
95
+        try:
96
+            self.sched.zk.lockNodeRequest(request, blocking=False)
97
+        except LockException:
98
+            # It may be locked by nodepool, which is fine.
99
+            self.log.debug("Unable to revise locked node request %s", request)
100
+            return False
101
+        try:
102
+            old_priority = request.relative_priority
103
+            request.relative_priority = relative_priority
104
+            self.sched.zk.storeNodeRequest(request)
105
+            self.log.debug("Revised relative priority of "
106
+                           "node request %s from %s to %s",
107
+                           request, old_priority, relative_priority)
108
+        except Exception:
109
+            self.log.exception("Unable to update node request %s", request)
110
+        finally:
111
+            try:
112
+                self.sched.zk.unlockNodeRequest(request)
113
+            except Exception:
114
+                self.log.exception("Unable to unlock node request %s", request)
115
+
82 116
     def holdNodeSet(self, nodeset, autohold_key):
83 117
         '''
84 118
         Perform a hold on the given set of nodes.

+ 4
- 0
zuul/scheduler.py View File

@@ -306,6 +306,10 @@ class Scheduler(threading.Thread):
306 306
         self.last_reconfigured = None
307 307
         self.tenant_last_reconfigured = {}
308 308
         self.autohold_requests = {}
309
+        self.use_relative_priority = False
310
+        if self.config.has_option('scheduler', 'relative_priority'):
311
+            if self.config.getboolean('scheduler', 'relative_priority'):
312
+                self.use_relative_priority = True
309 313
 
310 314
     def start(self):
311 315
         super(Scheduler, self).start()

+ 96
- 10
zuul/zk.py View File

@@ -41,6 +41,7 @@ class ZooKeeper(object):
41 41
     log = logging.getLogger("zuul.zk.ZooKeeper")
42 42
 
43 43
     REQUEST_ROOT = '/nodepool/requests'
44
+    REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
44 45
     NODE_ROOT = '/nodepool/nodes'
45 46
 
46 47
     # Log zookeeper retry every 10 seconds
@@ -162,8 +163,8 @@ class ZooKeeper(object):
162 163
             from ZooKeeper).  The watcher should return False when
163 164
             further updates are no longer necessary.
164 165
         '''
166
+        node_request.created_time = time.time()
165 167
         data = node_request.toDict()
166
-        data['created_time'] = time.time()
167 168
 
168 169
         path = '%s/%s-' % (self.REQUEST_ROOT, node_request.priority)
169 170
         path = self.client.create(path, self._dictToStr(data),
@@ -174,15 +175,7 @@ class ZooKeeper(object):
174 175
 
175 176
         def callback(data, stat):
176 177
             if data:
177
-                data = self._strToDict(data)
178
-                request_nodes = list(node_request.nodeset.getNodes())
179
-                for i, nodeid in enumerate(data.get('nodes', [])):
180
-                    node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
181
-                    node_data, node_stat = self.client.get(node_path)
182
-                    node_data = self._strToDict(node_data)
183
-                    request_nodes[i].id = nodeid
184
-                    request_nodes[i].updateFromDict(node_data)
185
-                node_request.updateFromDict(data)
178
+                self.updateNodeRequest(node_request, data)
186 179
             deleted = (data is None)  # data *are* none
187 180
             return watcher(node_request, deleted)
188 181
 
@@ -215,6 +208,34 @@ class ZooKeeper(object):
215 208
             return True
216 209
         return False
217 210
 
211
+    def storeNodeRequest(self, node_request):
212
+        '''Store the node request.
213
+
214
+        The request is expected to already exist and is updated in its
215
+        entirety.
216
+
217
+        :param NodeRequest node_request: The request to update.
218
+        '''
219
+
220
+        path = '%s/%s' % (self.NODE_REQUEST_ROOT, node_request.id)
221
+        self.client.set(path, self._dictToStr(node_request.toDict()))
222
+
223
+    def updateNodeRequest(self, node_request, data=None):
224
+        '''Refresh an existing node request.
225
+
226
+        :param NodeRequest node_request: The request to update.
227
+        :param dict data: The data to use; query ZK if absent.
228
+        '''
229
+        if data is None:
230
+            path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
231
+            data, stat = self.client.get(path)
232
+        data = self._strToDict(data)
233
+        request_nodes = list(node_request.nodeset.getNodes())
234
+        for i, nodeid in enumerate(data.get('nodes', [])):
235
+            request_nodes[i].id = nodeid
236
+            self.updateNode(request_nodes[i], nodeid)
237
+        node_request.updateFromDict(data)
238
+
218 239
     def storeNode(self, node):
219 240
         '''Store the node.
220 241
 
@@ -227,6 +248,18 @@ class ZooKeeper(object):
227 248
         path = '%s/%s' % (self.NODE_ROOT, node.id)
228 249
         self.client.set(path, self._dictToStr(node.toDict()))
229 250
 
251
+    def updateNode(self, node, nodeid):
252
+        '''Refresh an existing node.
253
+
254
+        :param Node node: The node to update.
255
+        :param Node nodeid: The zookeeper node ID.
256
+        '''
257
+
258
+        node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
259
+        node_data, node_stat = self.client.get(node_path)
260
+        node_data = self._strToDict(node_data)
261
+        node.updateFromDict(node_data)
262
+
230 263
     def lockNode(self, node, blocking=True, timeout=None):
231 264
         '''
232 265
         Lock a node.
@@ -268,6 +301,59 @@ class ZooKeeper(object):
268 301
         node.lock.release()
269 302
         node.lock = None
270 303
 
304
+    def lockNodeRequest(self, request, blocking=True, timeout=None):
305
+        '''
306
+        Lock a node request.
307
+
308
+        This will set the `lock` attribute of the request object when the
309
+        lock is successfully acquired.
310
+
311
+        :param NodeRequest request: The request to lock.
312
+        :param bool blocking: Whether or not to block on trying to
313
+            acquire the lock
314
+        :param int timeout: When blocking, how long to wait for the lock
315
+            to get acquired. None, the default, waits forever.
316
+
317
+        :raises: TimeoutException if we failed to acquire the lock when
318
+            blocking with a timeout. ZKLockException if we are not blocking
319
+            and could not get the lock, or a lock is already held.
320
+        '''
321
+
322
+        path = "%s/%s" % (self.REQUEST_LOCK_ROOT, request.id)
323
+        try:
324
+            lock = Lock(self.client, path)
325
+            have_lock = lock.acquire(blocking, timeout)
326
+        except kze.LockTimeout:
327
+            raise LockException(
328
+                "Timeout trying to acquire lock %s" % path)
329
+        except kze.NoNodeError:
330
+            have_lock = False
331
+            self.log.error("Request not found for locking: %s", request)
332
+
333
+        # If we aren't blocking, it's possible we didn't get the lock
334
+        # because someone else has it.
335
+        if not have_lock:
336
+            raise LockException("Did not get lock on %s" % path)
337
+
338
+        request.lock = lock
339
+        self.updateNodeRequest(request)
340
+
341
+    def unlockNodeRequest(self, request):
342
+        '''
343
+        Unlock a node request.
344
+
345
+        The request must already have been locked.
346
+
347
+        :param NodeRequest request: The request to unlock.
348
+
349
+        :raises: ZKLockException if the request is not currently locked.
350
+        '''
351
+        if request.lock is None:
352
+            raise LockException(
353
+                "Request %s does not hold a lock" % request)
354
+        request.lock.release()
355
+        request.lock = None
356
+
271 357
     def heldNodeCount(self, autohold_key):
272 358
         '''
273 359
         Count the number of nodes being held for the given tenant/project/job.

Loading…
Cancel
Save