Browse Source

Set relative priority of node requests

Add a relative_priority field to node requests and continuously
adjust it for each queue item based on the contents of queues.

This allows for a more fair distribution of build resources between
different projects.  The first item in a pipeline from a given
project (or, in the case of a dependent pipeline, group of projects)
has equal priority to all other first-items of other projcets in
the same pipeline.  Second items have a lower priority, etc.

Depends-On: https://review.openstack.org/620954
Change-Id: Id3799aeb2cec6d96a662bfa394a538050f7ea947
tags/3.4.0
James E. Blair 6 months ago
parent
commit
0b00c4685b
No account linked to committer's email address

+ 26
- 0
doc/source/admin/components.rst View File

@@ -276,6 +276,32 @@ The following sections of ``zuul.conf`` are used by the scheduler:
276 276
 
277 277
       Path to directory in which Zuul should save its state.
278 278
 
279
+   .. attr:: relative_priority
280
+      :default: False
281
+
282
+      A boolean which indicates whether the scheduler should supply
283
+      relative priority information for node requests.
284
+
285
+      In all cases, each pipeline may specify a precedence value which
286
+      is used by Nodepool to satisfy requests from higher-precedence
287
+      pipelines first.  If ``relative_priority`` is set to ``True``,
288
+      then Zuul will additionally group items in the same pipeline by
289
+      project and weight each request by its position in that
290
+      project's group.  A request for the first change of a given
291
+      project will have the highest relative priority, and the second
292
+      change a lower relative priority.  The first change of each
293
+      project in a pipeline has the same relative priority, regardless
294
+      of the order of submission or how many other changes are in the
295
+      pipeline.  This can be used to make node allocations complete
296
+      faster for projects with fewer changes in a system dominated by
297
+      projects with more changes.
298
+
299
+      If this value is ``False`` (the default), then node requests are
300
+      sorted by pipeline precedence followed by the order in which
301
+      they were submitted.  If this is ``True``, they are sorted by
302
+      pipeline precedence, followed by relative priority, and finally
303
+      the order in which they were submitted.
304
+
279 305
 Operation
280 306
 ~~~~~~~~~
281 307
 

+ 6
- 0
releasenotes/notes/relative_priority-dee014da5977da36.yaml View File

@@ -0,0 +1,6 @@
1
+---
2
+features:
3
+  - |
4
+    A new scheduler option, :attr:`scheduler.relative_priority`, can
5
+    be used to instruct Nodepool to fulfull requests from less-busy
6
+    projects more quickly.

+ 14
- 1
tests/base.py View File

@@ -1734,6 +1734,7 @@ class FakeNodepool(object):
1734 1734
     log = logging.getLogger("zuul.test.FakeNodepool")
1735 1735
 
1736 1736
     def __init__(self, host, port, chroot):
1737
+        self.complete_event = threading.Event()
1737 1738
         self.host_keys = None
1738 1739
         self.client = kazoo.client.KazooClient(
1739 1740
             hosts='%s:%s%s' % (host, port, chroot))
@@ -1752,12 +1753,21 @@ class FakeNodepool(object):
1752 1753
         self.client.stop()
1753 1754
         self.client.close()
1754 1755
 
1756
+    def pause(self):
1757
+        self.complete_event.wait()
1758
+        self.paused = True
1759
+
1760
+    def unpause(self):
1761
+        self.paused = False
1762
+
1755 1763
     def run(self):
1756 1764
         while self._running:
1765
+            self.complete_event.clear()
1757 1766
             try:
1758 1767
                 self._run()
1759 1768
             except Exception:
1760 1769
                 self.log.exception("Error in fake nodepool:")
1770
+            self.complete_event.set()
1761 1771
             time.sleep(0.1)
1762 1772
 
1763 1773
     def _run(self):
@@ -1772,7 +1782,7 @@ class FakeNodepool(object):
1772 1782
         except kazoo.exceptions.NoNodeError:
1773 1783
             return []
1774 1784
         reqs = []
1775
-        for oid in sorted(reqids):
1785
+        for oid in reqids:
1776 1786
             path = self.REQUEST_ROOT + '/' + oid
1777 1787
             try:
1778 1788
                 data, stat = self.client.get(path)
@@ -1781,6 +1791,9 @@ class FakeNodepool(object):
1781 1791
                 reqs.append(data)
1782 1792
             except kazoo.exceptions.NoNodeError:
1783 1793
                 pass
1794
+        reqs.sort(key=lambda r: (r['_oid'].split('-')[0],
1795
+                                 r['relative_priority'],
1796
+                                 r['_oid'].split('-')[1]))
1784 1797
         return reqs
1785 1798
 
1786 1799
     def getNodes(self):

+ 79
- 0
tests/fixtures/layouts/two-projects-integrated.yaml View File

@@ -0,0 +1,79 @@
1
+- pipeline:
2
+    name: check
3
+    manager: independent
4
+    trigger:
5
+      gerrit:
6
+        - event: patchset-created
7
+    success:
8
+      gerrit:
9
+        Verified: 1
10
+    failure:
11
+      gerrit:
12
+        Verified: -1
13
+
14
+- pipeline:
15
+    name: gate
16
+    manager: dependent
17
+    success-message: Build succeeded (gate).
18
+    trigger:
19
+      gerrit:
20
+        - event: comment-added
21
+          approval:
22
+            - Approved: 1
23
+    success:
24
+      gerrit:
25
+        Verified: 2
26
+        submit: true
27
+    failure:
28
+      gerrit:
29
+        Verified: -2
30
+    start:
31
+      gerrit:
32
+        Verified: 0
33
+    precedence: high
34
+
35
+- job:
36
+    name: base
37
+    parent: null
38
+    run: playbooks/base.yaml
39
+    nodeset:
40
+      nodes:
41
+        - name: controller
42
+          label: ubuntu-xenial
43
+
44
+- job:
45
+    name: test
46
+    run: playbooks/test.yaml
47
+
48
+- job:
49
+    name: integration
50
+    run: playbooks/integration.yaml
51
+
52
+- project:
53
+    name: org/project
54
+    check:
55
+      jobs:
56
+        - test
57
+    gate:
58
+      jobs:
59
+        - test
60
+
61
+- project:
62
+    name: org/project1
63
+    check:
64
+      jobs:
65
+        - integration
66
+    gate:
67
+      queue: integrated
68
+      jobs:
69
+        - integration
70
+
71
+- project:
72
+    name: org/project2
73
+    check:
74
+      jobs:
75
+        - integration
76
+    gate:
77
+      queue: integrated
78
+      jobs:
79
+        - integration

+ 1
- 0
tests/fixtures/zuul.conf View File

@@ -8,6 +8,7 @@ server=127.0.0.1
8 8
 
9 9
 [scheduler]
10 10
 tenant_config=main.yaml
11
+relative_priority=true
11 12
 
12 13
 [merger]
13 14
 git_dir=/tmp/zuul-test/merger-git

+ 4
- 4
tests/nodepool/test_nodepool_integration.py View File

@@ -58,7 +58,7 @@ class TestNodepoolIntegration(BaseTestCase):
58 58
         nodeset.addNode(model.Node(['controller'], 'fake-label'))
59 59
         job = model.Job('testjob')
60 60
         job.nodeset = nodeset
61
-        request = self.nodepool.requestNodes(None, job)
61
+        request = self.nodepool.requestNodes(None, job, 0)
62 62
         self.waitForRequests()
63 63
         self.assertEqual(len(self.provisioned_requests), 1)
64 64
         self.assertEqual(request.state, model.STATE_FULFILLED)
@@ -88,7 +88,7 @@ class TestNodepoolIntegration(BaseTestCase):
88 88
         nodeset.addNode(model.Node(['controller'], 'invalid-label'))
89 89
         job = model.Job('testjob')
90 90
         job.nodeset = nodeset
91
-        request = self.nodepool.requestNodes(None, job)
91
+        request = self.nodepool.requestNodes(None, job, 0)
92 92
         self.waitForRequests()
93 93
         self.assertEqual(len(self.provisioned_requests), 1)
94 94
         self.assertEqual(request.state, model.STATE_FAILED)
@@ -103,7 +103,7 @@ class TestNodepoolIntegration(BaseTestCase):
103 103
         job = model.Job('testjob')
104 104
         job.nodeset = nodeset
105 105
         self.fake_nodepool.paused = True
106
-        request = self.nodepool.requestNodes(None, job)
106
+        request = self.nodepool.requestNodes(None, job, 0)
107 107
         self.zk.client.stop()
108 108
         self.zk.client.start()
109 109
         self.fake_nodepool.paused = False
@@ -121,7 +121,7 @@ class TestNodepoolIntegration(BaseTestCase):
121 121
         job = model.Job('testjob')
122 122
         job.nodeset = nodeset
123 123
         self.fake_nodepool.paused = True
124
-        request = self.nodepool.requestNodes(None, job)
124
+        request = self.nodepool.requestNodes(None, job, 0)
125 125
         self.nodepool.cancelRequest(request)
126 126
 
127 127
         self.waitForRequests()

+ 26
- 8
tests/unit/test_nodepool.py View File

@@ -71,7 +71,7 @@ class TestNodepool(BaseTestCase):
71 71
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
72 72
         job = model.Job('testjob')
73 73
         job.nodeset = nodeset
74
-        request = self.nodepool.requestNodes(None, job)
74
+        request = self.nodepool.requestNodes(None, job, 0)
75 75
         self.waitForRequests()
76 76
         self.assertEqual(len(self.provisioned_requests), 1)
77 77
         self.assertEqual(request.state, 'fulfilled')
@@ -103,11 +103,11 @@ class TestNodepool(BaseTestCase):
103 103
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
104 104
         job = model.Job('testjob')
105 105
         job.nodeset = nodeset
106
-        self.fake_nodepool.paused = True
107
-        request = self.nodepool.requestNodes(None, job)
106
+        self.fake_nodepool.pause()
107
+        request = self.nodepool.requestNodes(None, job, 0)
108 108
         self.zk.client.stop()
109 109
         self.zk.client.start()
110
-        self.fake_nodepool.paused = False
110
+        self.fake_nodepool.unpause()
111 111
         self.waitForRequests()
112 112
         self.assertEqual(len(self.provisioned_requests), 1)
113 113
         self.assertEqual(request.state, 'fulfilled')
@@ -120,8 +120,8 @@ class TestNodepool(BaseTestCase):
120 120
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
121 121
         job = model.Job('testjob')
122 122
         job.nodeset = nodeset
123
-        self.fake_nodepool.paused = True
124
-        request = self.nodepool.requestNodes(None, job)
123
+        self.fake_nodepool.pause()
124
+        request = self.nodepool.requestNodes(None, job, 0)
125 125
         self.nodepool.cancelRequest(request)
126 126
 
127 127
         self.waitForRequests()
@@ -135,7 +135,7 @@ class TestNodepool(BaseTestCase):
135 135
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
136 136
         job = model.Job('testjob')
137 137
         job.nodeset = nodeset
138
-        request = self.nodepool.requestNodes(None, job)
138
+        request = self.nodepool.requestNodes(None, job, 0)
139 139
         self.waitForRequests()
140 140
         self.assertEqual(len(self.provisioned_requests), 1)
141 141
         self.assertEqual(request.state, 'fulfilled')
@@ -156,7 +156,7 @@ class TestNodepool(BaseTestCase):
156 156
         nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
157 157
         job = model.Job('testjob')
158 158
         job.nodeset = nodeset
159
-        request = self.nodepool.requestNodes(None, job)
159
+        request = self.nodepool.requestNodes(None, job, 0)
160 160
         self.waitForRequests()
161 161
         self.assertEqual(len(self.provisioned_requests), 1)
162 162
         self.assertEqual(request.state, 'fulfilled')
@@ -170,3 +170,21 @@ class TestNodepool(BaseTestCase):
170 170
         for node in nodeset.getNodes():
171 171
             self.assertIsNone(node.lock)
172 172
             self.assertEqual(node.state, 'ready')
173
+
174
+    def test_node_request_priority(self):
175
+        # Test that requests are satisfied in priority order
176
+
177
+        nodeset = model.NodeSet()
178
+        nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
179
+        nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
180
+        job = model.Job('testjob')
181
+        job.nodeset = nodeset
182
+        self.fake_nodepool.pause()
183
+        request1 = self.nodepool.requestNodes(None, job, 1)
184
+        request2 = self.nodepool.requestNodes(None, job, 0)
185
+        self.fake_nodepool.unpause()
186
+        self.waitForRequests()
187
+        self.assertEqual(len(self.provisioned_requests), 2)
188
+        self.assertEqual(request1.state, 'fulfilled')
189
+        self.assertEqual(request2.state, 'fulfilled')
190
+        self.assertTrue(request2.state_time < request1.state_time)

+ 97
- 16
tests/unit/test_scheduler.py View File

@@ -4972,7 +4972,7 @@ For CI problems and help debugging, contact ci@example.org"""
4972 4972
     def test_zookeeper_disconnect(self):
4973 4973
         "Test that jobs are executed after a zookeeper disconnect"
4974 4974
 
4975
-        self.fake_nodepool.paused = True
4975
+        self.fake_nodepool.pause()
4976 4976
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
4977 4977
         A.addApproval('Code-Review', 2)
4978 4978
         self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -4980,7 +4980,7 @@ For CI problems and help debugging, contact ci@example.org"""
4980 4980
 
4981 4981
         self.zk.client.stop()
4982 4982
         self.zk.client.start()
4983
-        self.fake_nodepool.paused = False
4983
+        self.fake_nodepool.unpause()
4984 4984
         self.waitUntilSettled()
4985 4985
 
4986 4986
         self.assertEqual(A.data['status'], 'MERGED')
@@ -4991,7 +4991,7 @@ For CI problems and help debugging, contact ci@example.org"""
4991 4991
 
4992 4992
         # This tests receiving a ZK disconnect between the arrival of
4993 4993
         # a fulfilled request and when we accept its nodes.
4994
-        self.fake_nodepool.paused = True
4994
+        self.fake_nodepool.pause()
4995 4995
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
4996 4996
         A.addApproval('Code-Review', 2)
4997 4997
         self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5003,7 +5003,7 @@ For CI problems and help debugging, contact ci@example.org"""
5003 5003
         self.sched.run_handler_lock.acquire()
5004 5004
 
5005 5005
         # Fulfill the nodepool request.
5006
-        self.fake_nodepool.paused = False
5006
+        self.fake_nodepool.unpause()
5007 5007
         requests = list(self.sched.nodepool.requests.values())
5008 5008
         self.assertEqual(1, len(requests))
5009 5009
         request = requests[0]
@@ -5037,7 +5037,7 @@ For CI problems and help debugging, contact ci@example.org"""
5037 5037
     def test_nodepool_failure(self):
5038 5038
         "Test that jobs are reported after a nodepool failure"
5039 5039
 
5040
-        self.fake_nodepool.paused = True
5040
+        self.fake_nodepool.pause()
5041 5041
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5042 5042
         A.addApproval('Code-Review', 2)
5043 5043
         self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5046,7 +5046,7 @@ For CI problems and help debugging, contact ci@example.org"""
5046 5046
         req = self.fake_nodepool.getNodeRequests()[0]
5047 5047
         self.fake_nodepool.addFailRequest(req)
5048 5048
 
5049
-        self.fake_nodepool.paused = False
5049
+        self.fake_nodepool.unpause()
5050 5050
         self.waitUntilSettled()
5051 5051
 
5052 5052
         self.assertEqual(A.data['status'], 'NEW')
@@ -5055,10 +5055,10 @@ For CI problems and help debugging, contact ci@example.org"""
5055 5055
         self.assertIn('project-test1 : SKIPPED', A.messages[1])
5056 5056
         self.assertIn('project-test2 : SKIPPED', A.messages[1])
5057 5057
 
5058
-    def test_nodepool_priority(self):
5059
-        "Test that nodes are requested at the correct priority"
5058
+    def test_nodepool_pipeline_priority(self):
5059
+        "Test that nodes are requested at the correct pipeline priority"
5060 5060
 
5061
-        self.fake_nodepool.paused = True
5061
+        self.fake_nodepool.pause()
5062 5062
 
5063 5063
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5064 5064
         self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
@@ -5075,10 +5075,11 @@ For CI problems and help debugging, contact ci@example.org"""
5075 5075
 
5076 5076
         reqs = self.fake_nodepool.getNodeRequests()
5077 5077
 
5078
-        # The requests come back sorted by oid. Since we have three requests
5079
-        # for the three changes each with a different priority.
5080
-        # Also they get a serial number based on order they were received
5081
-        # so the number on the endof the oid should map to order submitted.
5078
+        # The requests come back sorted by priority. Since we have
5079
+        # three requests for the three changes each with a different
5080
+        # priority.  Also they get a serial number based on order they
5081
+        # were received so the number on the endof the oid should map
5082
+        # to order submitted.
5082 5083
 
5083 5084
         # * gate first - high priority - change C
5084 5085
         self.assertEqual(reqs[0]['_oid'], '100-0000000002')
@@ -5092,13 +5093,93 @@ For CI problems and help debugging, contact ci@example.org"""
5092 5093
         self.assertEqual(reqs[2]['_oid'], '300-0000000000')
5093 5094
         self.assertEqual(reqs[2]['node_types'], ['ubuntu-xenial'])
5094 5095
 
5095
-        self.fake_nodepool.paused = False
5096
+        self.fake_nodepool.unpause()
5097
+        self.waitUntilSettled()
5098
+
5099
+    def test_nodepool_relative_priority_check(self):
5100
+        "Test that nodes are requested at the relative priority"
5101
+
5102
+        self.fake_nodepool.pause()
5103
+
5104
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5105
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
5106
+        self.waitUntilSettled()
5107
+
5108
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
5109
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
5110
+        self.waitUntilSettled()
5111
+
5112
+        C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C')
5113
+        self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
5114
+        self.waitUntilSettled()
5115
+
5116
+        reqs = self.fake_nodepool.getNodeRequests()
5117
+
5118
+        # The requests come back sorted by priority.
5119
+
5120
+        # Change A, first change for project, high relative priority.
5121
+        self.assertEqual(reqs[0]['_oid'], '200-0000000000')
5122
+        self.assertEqual(reqs[0]['relative_priority'], 0)
5123
+
5124
+        # Change C, first change for project1, high relative priority.
5125
+        self.assertEqual(reqs[1]['_oid'], '200-0000000002')
5126
+        self.assertEqual(reqs[1]['relative_priority'], 0)
5127
+
5128
+        # Change B, second change for project, lower relative priority.
5129
+        self.assertEqual(reqs[2]['_oid'], '200-0000000001')
5130
+        self.assertEqual(reqs[2]['relative_priority'], 1)
5131
+
5132
+        self.fake_nodepool.unpause()
5133
+        self.waitUntilSettled()
5134
+
5135
+    @simple_layout('layouts/two-projects-integrated.yaml')
5136
+    def test_nodepool_relative_priority_gate(self):
5137
+        "Test that nodes are requested at the relative priority"
5138
+
5139
+        self.fake_nodepool.pause()
5140
+
5141
+        A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
5142
+        A.addApproval('Code-Review', 2)
5143
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
5144
+        self.waitUntilSettled()
5145
+
5146
+        B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
5147
+        B.addApproval('Code-Review', 2)
5148
+        self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
5149
+        self.waitUntilSettled()
5150
+
5151
+        # project does not share a queue with project1 and project2.
5152
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
5153
+        C.addApproval('Code-Review', 2)
5154
+        self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
5155
+        self.waitUntilSettled()
5156
+
5157
+        reqs = self.fake_nodepool.getNodeRequests()
5158
+
5159
+        # The requests come back sorted by priority.
5160
+
5161
+        # Change A, first change for shared queue, high relative
5162
+        # priority.
5163
+        self.assertEqual(reqs[0]['_oid'], '100-0000000000')
5164
+        self.assertEqual(reqs[0]['relative_priority'], 0)
5165
+
5166
+        # Change C, first change for independent project, high
5167
+        # relative priority.
5168
+        self.assertEqual(reqs[1]['_oid'], '100-0000000002')
5169
+        self.assertEqual(reqs[1]['relative_priority'], 0)
5170
+
5171
+        # Change B, second change for shared queue, lower relative
5172
+        # priority.
5173
+        self.assertEqual(reqs[2]['_oid'], '100-0000000001')
5174
+        self.assertEqual(reqs[2]['relative_priority'], 1)
5175
+
5176
+        self.fake_nodepool.unpause()
5096 5177
         self.waitUntilSettled()
5097 5178
 
5098 5179
     def test_nodepool_job_removal(self):
5099 5180
         "Test that nodes are returned unused after job removal"
5100 5181
 
5101
-        self.fake_nodepool.paused = True
5182
+        self.fake_nodepool.pause()
5102 5183
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5103 5184
         A.addApproval('Code-Review', 2)
5104 5185
         self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5108,7 +5189,7 @@ For CI problems and help debugging, contact ci@example.org"""
5108 5189
         self.sched.reconfigure(self.config)
5109 5190
         self.waitUntilSettled()
5110 5191
 
5111
-        self.fake_nodepool.paused = False
5192
+        self.fake_nodepool.unpause()
5112 5193
         self.waitUntilSettled()
5113 5194
 
5114 5195
         self.assertEqual(A.data['status'], 'MERGED')

+ 16
- 1
zuul/manager/__init__.py View File

@@ -85,6 +85,11 @@ class PipelineManager(object):
85 85
                 return True
86 86
         return False
87 87
 
88
+    def getNodePriority(self, item):
89
+        items = self.pipeline.getAllItems()
90
+        items = [i for i in items if i.change.project == item.change.project]
91
+        return items.index(item)
92
+
88 93
     def isChangeAlreadyInPipeline(self, change):
89 94
         # Checks live items in the pipeline
90 95
         for item in self.pipeline.getAllItems():
@@ -327,8 +332,12 @@ class PipelineManager(object):
327 332
             return False
328 333
         build_set = item.current_build_set
329 334
         self.log.debug("Requesting nodes for change %s" % item.change)
335
+        if self.sched.use_relative_priority:
336
+            priority = item.getNodePriority()
337
+        else:
338
+            priority = 0
330 339
         for job in jobs:
331
-            req = self.sched.nodepool.requestNodes(build_set, job)
340
+            req = self.sched.nodepool.requestNodes(build_set, job, priority)
332 341
             self.log.debug("Adding node request %s for job %s to item %s" %
333 342
                            (req, job, item))
334 343
             build_set.setJobNodeRequest(job.name, req)
@@ -687,6 +696,12 @@ class PipelineManager(object):
687 696
         if failing_reasons:
688 697
             self.log.debug("%s is a failing item because %s" %
689 698
                            (item, failing_reasons))
699
+        if not dequeued and self.sched.use_relative_priority:
700
+            priority = item.getNodePriority()
701
+            for node_request in item.current_build_set.node_requests.values():
702
+                if node_request.relative_priority != priority:
703
+                    self.sched.nodepool.reviseNodeRequest(
704
+                        node_request, priority)
690 705
         return (changed, nnfi)
691 706
 
692 707
     def processQueue(self):

+ 5
- 0
zuul/manager/dependent.py View File

@@ -93,6 +93,11 @@ class DependentPipelineManager(PipelineManager):
93 93
             self.log.debug("Dynamically created queue %s", change_queue)
94 94
             return DynamicChangeQueueContextManager(change_queue)
95 95
 
96
+    def getNodePriority(self, item):
97
+        with self.getChangeQueue(item.change) as change_queue:
98
+            items = change_queue.queue
99
+            return items.index(item)
100
+
96 101
     def isChangeReadyToBeEnqueued(self, change):
97 102
         source = change.project.source
98 103
         if not source.canMerge(change, self.getSubmitAllowNeeds()):

+ 17
- 4
zuul/model.py View File

@@ -688,7 +688,7 @@ class NodeSet(ConfigObject):
688 688
 class NodeRequest(object):
689 689
     """A request for a set of nodes."""
690 690
 
691
-    def __init__(self, requestor, build_set, job, nodeset):
691
+    def __init__(self, requestor, build_set, job, nodeset, relative_priority):
692 692
         self.requestor = requestor
693 693
         self.build_set = build_set
694 694
         self.job = job
@@ -696,9 +696,12 @@ class NodeRequest(object):
696 696
         self._state = STATE_REQUESTED
697 697
         self.requested_time = time.time()
698 698
         self.state_time = time.time()
699
+        self.created_time = None
699 700
         self.stat = None
700 701
         self.uid = uuid4().hex
702
+        self.relative_priority = relative_priority
701 703
         self.id = None
704
+        self._zk_data = {}  # Data that we read back from ZK
702 705
         # Zuul internal flags (not stored in ZK so they are not
703 706
         # overwritten).
704 707
         self.failed = False
@@ -731,17 +734,24 @@ class NodeRequest(object):
731 734
         return '<NodeRequest %s %s>' % (self.id, self.nodeset)
732 735
 
733 736
     def toDict(self):
734
-        d = {}
737
+        # Start with any previously read data
738
+        d = self._zk_data.copy()
735 739
         nodes = [n.label for n in self.nodeset.getNodes()]
736
-        d['node_types'] = nodes
737
-        d['requestor'] = self.requestor
740
+        # These are immutable once set
741
+        d.setdefault('node_types', nodes)
742
+        d.setdefault('requestor', self.requestor)
743
+        d.setdefault('created_time', self.created_time)
744
+        # We might change these
738 745
         d['state'] = self.state
739 746
         d['state_time'] = self.state_time
747
+        d['relative_priority'] = self.relative_priority
740 748
         return d
741 749
 
742 750
     def updateFromDict(self, data):
751
+        self._zk_data = data
743 752
         self._state = data['state']
744 753
         self.state_time = data['state_time']
754
+        self.relative_priority = data['relative_priority']
745 755
 
746 756
 
747 757
 class Secret(ConfigObject):
@@ -2245,6 +2255,9 @@ class QueueItem(object):
2245 2255
             fakebuild.result = 'SKIPPED'
2246 2256
             self.addBuild(fakebuild)
2247 2257
 
2258
+    def getNodePriority(self):
2259
+        return self.pipeline.manager.getNodePriority(self)
2260
+
2248 2261
     def formatUrlPattern(self, url_pattern, job=None, build=None):
2249 2262
         url = None
2250 2263
         # Produce safe versions of objects which may be useful in

+ 36
- 2
zuul/nodepool.py View File

@@ -13,6 +13,7 @@
13 13
 import logging
14 14
 
15 15
 from zuul import model
16
+from zuul.zk import LockException
16 17
 
17 18
 
18 19
 class Nodepool(object):
@@ -51,11 +52,12 @@ class Nodepool(object):
51 52
             statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
52 53
         statsd.gauge('zuul.nodepool.current_requests', len(self.requests))
53 54
 
54
-    def requestNodes(self, build_set, job):
55
+    def requestNodes(self, build_set, job, relative_priority):
55 56
         # Create a copy of the nodeset to represent the actual nodes
56 57
         # returned by nodepool.
57 58
         nodeset = job.nodeset.copy()
58
-        req = model.NodeRequest(self.sched.hostname, build_set, job, nodeset)
59
+        req = model.NodeRequest(self.sched.hostname, build_set, job,
60
+                                nodeset, relative_priority)
59 61
         self.requests[req.uid] = req
60 62
 
61 63
         if nodeset.nodes:
@@ -79,6 +81,38 @@ class Nodepool(object):
79 81
             except Exception:
80 82
                 self.log.exception("Error deleting node request:")
81 83
 
84
+    def reviseRequest(self, request, relative_priority=None):
85
+        '''Attempt to update the node request, if it is not currently being
86
+        processed.
87
+
88
+        :param: NodeRequest request: The request to update.
89
+        :param relative_priority int: If supplied, the new relative
90
+            priority to set on the request.
91
+
92
+        '''
93
+        if relative_priority is None:
94
+            return
95
+        try:
96
+            self.sched.zk.lockNodeRequest(request, blocking=False)
97
+        except LockException:
98
+            # It may be locked by nodepool, which is fine.
99
+            self.log.debug("Unable to revise locked node request %s", request)
100
+            return False
101
+        try:
102
+            old_priority = request.relative_priority
103
+            request.relative_priority = relative_priority
104
+            self.sched.zk.storeNodeRequest(request)
105
+            self.log.debug("Revised relative priority of "
106
+                           "node request %s from %s to %s",
107
+                           request, old_priority, relative_priority)
108
+        except Exception:
109
+            self.log.exception("Unable to update node request %s", request)
110
+        finally:
111
+            try:
112
+                self.sched.zk.unlockNodeRequest(request)
113
+            except Exception:
114
+                self.log.exception("Unable to unlock node request %s", request)
115
+
82 116
     def holdNodeSet(self, nodeset, autohold_key):
83 117
         '''
84 118
         Perform a hold on the given set of nodes.

+ 4
- 0
zuul/scheduler.py View File

@@ -305,6 +305,10 @@ class Scheduler(threading.Thread):
305 305
         self.last_reconfigured = None
306 306
         self.tenant_last_reconfigured = {}
307 307
         self.autohold_requests = {}
308
+        self.use_relative_priority = False
309
+        if self.config.has_option('scheduler', 'relative_priority'):
310
+            if self.config.getboolean('scheduler', 'relative_priority'):
311
+                self.use_relative_priority = True
308 312
 
309 313
     def start(self):
310 314
         super(Scheduler, self).start()

+ 96
- 10
zuul/zk.py View File

@@ -41,6 +41,7 @@ class ZooKeeper(object):
41 41
     log = logging.getLogger("zuul.zk.ZooKeeper")
42 42
 
43 43
     REQUEST_ROOT = '/nodepool/requests'
44
+    REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
44 45
     NODE_ROOT = '/nodepool/nodes'
45 46
 
46 47
     # Log zookeeper retry every 10 seconds
@@ -162,8 +163,8 @@ class ZooKeeper(object):
162 163
             from ZooKeeper).  The watcher should return False when
163 164
             further updates are no longer necessary.
164 165
         '''
166
+        node_request.created_time = time.time()
165 167
         data = node_request.toDict()
166
-        data['created_time'] = time.time()
167 168
 
168 169
         path = '%s/%s-' % (self.REQUEST_ROOT, node_request.priority)
169 170
         path = self.client.create(path, self._dictToStr(data),
@@ -174,15 +175,7 @@ class ZooKeeper(object):
174 175
 
175 176
         def callback(data, stat):
176 177
             if data:
177
-                data = self._strToDict(data)
178
-                request_nodes = list(node_request.nodeset.getNodes())
179
-                for i, nodeid in enumerate(data.get('nodes', [])):
180
-                    node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
181
-                    node_data, node_stat = self.client.get(node_path)
182
-                    node_data = self._strToDict(node_data)
183
-                    request_nodes[i].id = nodeid
184
-                    request_nodes[i].updateFromDict(node_data)
185
-                node_request.updateFromDict(data)
178
+                self.updateNodeRequest(node_request, data)
186 179
             deleted = (data is None)  # data *are* none
187 180
             return watcher(node_request, deleted)
188 181
 
@@ -215,6 +208,34 @@ class ZooKeeper(object):
215 208
             return True
216 209
         return False
217 210
 
211
+    def storeNodeRequest(self, node_request):
212
+        '''Store the node request.
213
+
214
+        The request is expected to already exist and is updated in its
215
+        entirety.
216
+
217
+        :param NodeRequest node_request: The request to update.
218
+        '''
219
+
220
+        path = '%s/%s' % (self.NODE_REQUEST_ROOT, node_request.id)
221
+        self.client.set(path, self._dictToStr(node_request.toDict()))
222
+
223
+    def updateNodeRequest(self, node_request, data=None):
224
+        '''Refresh an existing node request.
225
+
226
+        :param NodeRequest node_request: The request to update.
227
+        :param dict data: The data to use; query ZK if absent.
228
+        '''
229
+        if data is None:
230
+            path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
231
+            data, stat = self.client.get(path)
232
+        data = self._strToDict(data)
233
+        request_nodes = list(node_request.nodeset.getNodes())
234
+        for i, nodeid in enumerate(data.get('nodes', [])):
235
+            request_nodes[i].id = nodeid
236
+            self.updateNode(request_nodes[i], nodeid)
237
+        node_request.updateFromDict(data)
238
+
218 239
     def storeNode(self, node):
219 240
         '''Store the node.
220 241
 
@@ -227,6 +248,18 @@ class ZooKeeper(object):
227 248
         path = '%s/%s' % (self.NODE_ROOT, node.id)
228 249
         self.client.set(path, self._dictToStr(node.toDict()))
229 250
 
251
+    def updateNode(self, node, nodeid):
252
+        '''Refresh an existing node.
253
+
254
+        :param Node node: The node to update.
255
+        :param Node nodeid: The zookeeper node ID.
256
+        '''
257
+
258
+        node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
259
+        node_data, node_stat = self.client.get(node_path)
260
+        node_data = self._strToDict(node_data)
261
+        node.updateFromDict(node_data)
262
+
230 263
     def lockNode(self, node, blocking=True, timeout=None):
231 264
         '''
232 265
         Lock a node.
@@ -268,6 +301,59 @@ class ZooKeeper(object):
268 301
         node.lock.release()
269 302
         node.lock = None
270 303
 
304
+    def lockNodeRequest(self, request, blocking=True, timeout=None):
305
+        '''
306
+        Lock a node request.
307
+
308
+        This will set the `lock` attribute of the request object when the
309
+        lock is successfully acquired.
310
+
311
+        :param NodeRequest request: The request to lock.
312
+        :param bool blocking: Whether or not to block on trying to
313
+            acquire the lock
314
+        :param int timeout: When blocking, how long to wait for the lock
315
+            to get acquired. None, the default, waits forever.
316
+
317
+        :raises: TimeoutException if we failed to acquire the lock when
318
+            blocking with a timeout. ZKLockException if we are not blocking
319
+            and could not get the lock, or a lock is already held.
320
+        '''
321
+
322
+        path = "%s/%s" % (self.REQUEST_LOCK_ROOT, request.id)
323
+        try:
324
+            lock = Lock(self.client, path)
325
+            have_lock = lock.acquire(blocking, timeout)
326
+        except kze.LockTimeout:
327
+            raise LockException(
328
+                "Timeout trying to acquire lock %s" % path)
329
+        except kze.NoNodeError:
330
+            have_lock = False
331
+            self.log.error("Request not found for locking: %s", request)
332
+
333
+        # If we aren't blocking, it's possible we didn't get the lock
334
+        # because someone else has it.
335
+        if not have_lock:
336
+            raise LockException("Did not get lock on %s" % path)
337
+
338
+        request.lock = lock
339
+        self.updateNodeRequest(request)
340
+
341
+    def unlockNodeRequest(self, request):
342
+        '''
343
+        Unlock a node request.
344
+
345
+        The request must already have been locked.
346
+
347
+        :param NodeRequest request: The request to unlock.
348
+
349
+        :raises: ZKLockException if the request is not currently locked.
350
+        '''
351
+        if request.lock is None:
352
+            raise LockException(
353
+                "Request %s does not hold a lock" % request)
354
+        request.lock.release()
355
+        request.lock = None
356
+
271 357
     def heldNodeCount(self, autohold_key):
272 358
         '''
273 359
         Count the number of nodes being held for the given tenant/project/job.

Loading…
Cancel
Save