Merge "Store autohold requests in zookeeper"
This commit is contained in:
commit
853c2e0834
|
@ -39,6 +39,22 @@ Example::
|
|||
|
||||
zuul autohold --tenant openstack --project example_project --job example_job --reason "reason text" --count 1
|
||||
|
||||
Autohold Delete
|
||||
^^^^^^^^^^^^^^^
|
||||
.. program-output:: zuul autohold-delete --help
|
||||
|
||||
Example::
|
||||
|
||||
zuul autohold-delete --id 0000000123
|
||||
|
||||
Autohold List
|
||||
^^^^^^^^^^^^^
|
||||
.. program-output:: zuul autohold-list --help
|
||||
|
||||
Example::
|
||||
|
||||
zuul autohold-list
|
||||
|
||||
Dequeue
|
||||
^^^^^^^
|
||||
.. program-output:: zuul dequeue --help
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Autohold requests are now stored in ZooKeeper, rather than in memory.
|
||||
As a result of this change, a new zuul CLI command, autohold-delete has
|
||||
been added to remove existing requests.
|
|
@ -1632,6 +1632,18 @@ class TestScheduler(ZuulTestCase):
|
|||
"", "", "reason text", 1)
|
||||
self.assertTrue(r)
|
||||
|
||||
# There should be a record in ZooKeeper
|
||||
request_list = self.zk.getHoldRequests()
|
||||
self.assertEqual(1, len(request_list))
|
||||
request = self.zk.getHoldRequest(request_list[0])
|
||||
self.assertIsNotNone(request)
|
||||
self.assertEqual('tenant-one', request.tenant)
|
||||
self.assertEqual('review.example.com/org/project', request.project)
|
||||
self.assertEqual('project-test2', request.job)
|
||||
self.assertEqual('reason text', request.reason)
|
||||
self.assertEqual(1, request.max_count)
|
||||
self.assertEqual(0, request.current_count)
|
||||
|
||||
# First check that successful jobs do not autohold
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
|
@ -1680,6 +1692,10 @@ class TestScheduler(ZuulTestCase):
|
|||
)
|
||||
self.assertEqual(held_node['comment'], "reason text")
|
||||
|
||||
# The hold request current_count should have incremented
|
||||
request2 = self.zk.getHoldRequest(request.id)
|
||||
self.assertEqual(request.current_count + 1, request2.current_count)
|
||||
|
||||
# Another failed change should not hold any more nodes
|
||||
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
|
||||
self.executor_server.failJob('project-test2', C)
|
||||
|
@ -1696,6 +1712,30 @@ class TestScheduler(ZuulTestCase):
|
|||
held_nodes += 1
|
||||
self.assertEqual(held_nodes, 1)
|
||||
|
||||
# request current_count should not have changed
|
||||
request3 = self.zk.getHoldRequest(request2.id)
|
||||
self.assertEqual(request2.current_count, request3.current_count)
|
||||
|
||||
@simple_layout('layouts/autohold.yaml')
|
||||
def test_autohold_delete(self):
|
||||
client = zuul.rpcclient.RPCClient('127.0.0.1',
|
||||
self.gearman_server.port)
|
||||
self.addCleanup(client.shutdown)
|
||||
r = client.autohold('tenant-one', 'org/project', 'project-test2',
|
||||
"", "", "reason text", 1)
|
||||
self.assertTrue(r)
|
||||
|
||||
# There should be a record in ZooKeeper
|
||||
request_list = self.zk.getHoldRequests()
|
||||
self.assertEqual(1, len(request_list))
|
||||
request = self.zk.getHoldRequest(request_list[0])
|
||||
self.assertIsNotNone(request)
|
||||
|
||||
# Delete and verify no more requests
|
||||
self.assertTrue(client.autohold_delete(request.id))
|
||||
request_list = self.zk.getHoldRequests()
|
||||
self.assertEqual([], request_list)
|
||||
|
||||
def _test_autohold_scoped(self, change_obj, change, ref):
|
||||
client = zuul.rpcclient.RPCClient('127.0.0.1',
|
||||
self.gearman_server.port)
|
||||
|
@ -1920,20 +1960,15 @@ class TestScheduler(ZuulTestCase):
|
|||
self.assertTrue(r)
|
||||
|
||||
autohold_requests = client.autohold_list()
|
||||
self.assertNotEqual({}, autohold_requests)
|
||||
self.assertEqual(1, len(autohold_requests.keys()))
|
||||
self.assertNotEqual([], autohold_requests)
|
||||
self.assertEqual(1, len(autohold_requests))
|
||||
|
||||
# The single dict key should be a CSV string value
|
||||
key = list(autohold_requests.keys())[0]
|
||||
tenant, project, job, ref_filter = key.split(',')
|
||||
|
||||
self.assertEqual('tenant-one', tenant)
|
||||
self.assertIn('org/project', project)
|
||||
self.assertEqual('project-test2', job)
|
||||
self.assertEqual(".*", ref_filter)
|
||||
|
||||
# Note: the value is converted from set to list by json.
|
||||
self.assertEqual([1, "reason text", None], autohold_requests[key])
|
||||
request = autohold_requests[0]
|
||||
self.assertEqual('tenant-one', request['tenant'])
|
||||
self.assertIn('org/project', request['project'])
|
||||
self.assertEqual('project-test2', request['job'])
|
||||
self.assertEqual(".*", request['ref_filter'])
|
||||
self.assertEqual("reason text", request['reason'])
|
||||
|
||||
@simple_layout('layouts/three-projects.yaml')
|
||||
def test_dependent_behind_dequeue(self):
|
||||
|
|
|
@ -1295,17 +1295,15 @@ class TestTenantScopedWebApi(BaseTestWeb):
|
|||
self.gearman_server.port)
|
||||
self.addCleanup(client.shutdown)
|
||||
autohold_requests = client.autohold_list()
|
||||
self.assertNotEqual({}, autohold_requests)
|
||||
self.assertEqual(1, len(autohold_requests.keys()))
|
||||
key = list(autohold_requests.keys())[0]
|
||||
tenant, project, job, ref_filter = key.split(',')
|
||||
self.assertEqual('tenant-one', tenant)
|
||||
self.assertIn('org/project', project)
|
||||
self.assertEqual('project-test2', job)
|
||||
self.assertEqual(".*", ref_filter)
|
||||
# Note: the value is converted from set to list by json.
|
||||
self.assertEqual([1, "some reason", None], autohold_requests[key],
|
||||
autohold_requests[key])
|
||||
self.assertNotEqual([], autohold_requests)
|
||||
self.assertEqual(1, len(autohold_requests))
|
||||
request = autohold_requests[0]
|
||||
self.assertEqual('tenant-one', request['tenant'])
|
||||
self.assertIn('org/project', request['project'])
|
||||
self.assertEqual('project-test2', request['job'])
|
||||
self.assertEqual(".*", request['ref_filter'])
|
||||
self.assertEqual("some reason", request['reason'])
|
||||
self.assertEqual(1, request['max_count'])
|
||||
|
||||
def test_enqueue(self):
|
||||
"""Test that the admin web interface can enqueue a change"""
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
# Copyright 2019 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import testtools
|
||||
|
||||
import zuul.zk
|
||||
from zuul import model
|
||||
|
||||
from tests.base import BaseTestCase, ChrootedKazooFixture
|
||||
|
||||
|
||||
class TestZK(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self.zk_chroot_fixture = self.useFixture(
|
||||
ChrootedKazooFixture(self.id()))
|
||||
self.zk_config = '%s:%s%s' % (
|
||||
self.zk_chroot_fixture.zookeeper_host,
|
||||
self.zk_chroot_fixture.zookeeper_port,
|
||||
self.zk_chroot_fixture.zookeeper_chroot)
|
||||
|
||||
self.zk = zuul.zk.ZooKeeper()
|
||||
self.addCleanup(self.zk.disconnect)
|
||||
self.zk.connect(self.zk_config)
|
||||
|
||||
def _createRequest(self):
|
||||
req = model.HoldRequest()
|
||||
req.count = 1
|
||||
req.reason = 'some reason'
|
||||
req.expiration = 1
|
||||
return req
|
||||
|
||||
def test_hold_requests_api(self):
|
||||
# Test no requests returns empty list
|
||||
self.assertEqual([], self.zk.getHoldRequests())
|
||||
|
||||
# Test get on non-existent request is None
|
||||
self.assertIsNone(self.zk.getHoldRequest('anything'))
|
||||
|
||||
# Test creating a new request
|
||||
req1 = self._createRequest()
|
||||
self.zk.storeHoldRequest(req1)
|
||||
self.assertIsNotNone(req1.id)
|
||||
self.assertEqual(1, len(self.zk.getHoldRequests()))
|
||||
|
||||
# Test getting the request
|
||||
req2 = self.zk.getHoldRequest(req1.id)
|
||||
self.assertEqual(req1.toDict(), req2.toDict())
|
||||
|
||||
# Test updating the request
|
||||
req2.reason = 'a new reason'
|
||||
self.zk.storeHoldRequest(req2)
|
||||
req2 = self.zk.getHoldRequest(req2.id)
|
||||
self.assertNotEqual(req1.reason, req2.reason)
|
||||
|
||||
# Test lock operations
|
||||
self.zk.lockHoldRequest(req2, blocking=False)
|
||||
with testtools.ExpectedException(
|
||||
zuul.zk.LockException,
|
||||
"Timeout trying to acquire lock .*"
|
||||
):
|
||||
self.zk.lockHoldRequest(req2, blocking=True, timeout=2)
|
||||
self.zk.unlockHoldRequest(req2)
|
||||
self.assertIsNone(req2.lock)
|
||||
|
||||
# Test deleting the request
|
||||
self.zk.deleteHoldRequest(req1)
|
||||
self.assertEqual([], self.zk.getHoldRequests())
|
|
@ -202,6 +202,13 @@ class Client(zuul.cmd.ZuulApp):
|
|||
required=False, type=int, default=0)
|
||||
cmd_autohold.set_defaults(func=self.autohold)
|
||||
|
||||
cmd_autohold_delete = subparsers.add_parser(
|
||||
'autohold-delete', help='delete autohold request')
|
||||
cmd_autohold_delete.set_defaults(func=self.autohold_delete)
|
||||
cmd_autohold_delete.add_argument('--id',
|
||||
help='request ID',
|
||||
required=True)
|
||||
|
||||
cmd_autohold_list = subparsers.add_parser(
|
||||
'autohold-list', help='list autohold requests')
|
||||
cmd_autohold_list.add_argument('--tenant', help='tenant name',
|
||||
|
@ -430,28 +437,35 @@ class Client(zuul.cmd.ZuulApp):
|
|||
node_hold_expiration=node_hold_expiration)
|
||||
return r
|
||||
|
||||
def autohold_delete(self):
|
||||
client = self.get_client()
|
||||
return client.autohold_delete(self.args.id)
|
||||
|
||||
def autohold_list(self):
|
||||
client = self.get_client()
|
||||
autohold_requests = client.autohold_list(tenant=self.args.tenant)
|
||||
|
||||
if len(autohold_requests.keys()) == 0:
|
||||
if not autohold_requests:
|
||||
print("No autohold requests found")
|
||||
return True
|
||||
|
||||
table = prettytable.PrettyTable(
|
||||
field_names=[
|
||||
'Tenant', 'Project', 'Job', 'Ref Filter', 'Count', 'Reason'
|
||||
'ID', 'Tenant', 'Project', 'Job', 'Ref Filter',
|
||||
'Max Count', 'Reason'
|
||||
])
|
||||
|
||||
for key, value in autohold_requests.items():
|
||||
# The key comes to us as a CSV string because json doesn't like
|
||||
# non-str keys.
|
||||
tenant_name, project_name, job_name, ref_filter = key.split(',')
|
||||
count, reason, node_hold_expiration = value
|
||||
|
||||
for request in autohold_requests:
|
||||
table.add_row([
|
||||
tenant_name, project_name, job_name, ref_filter, count, reason
|
||||
request['id'],
|
||||
request['tenant'],
|
||||
request['project'],
|
||||
request['job'],
|
||||
request['ref_filter'],
|
||||
request['max_count'],
|
||||
request['reason'],
|
||||
])
|
||||
|
||||
print(table)
|
||||
return True
|
||||
|
||||
|
|
|
@ -4674,6 +4674,65 @@ class WebInfo(object):
|
|||
return d
|
||||
|
||||
|
||||
class HoldRequest(object):
|
||||
def __init__(self):
|
||||
self.lock = None
|
||||
self.id = None
|
||||
self.tenant = None
|
||||
self.project = None
|
||||
self.job = None
|
||||
self.ref_filter = None
|
||||
self.reason = None
|
||||
self.node_expiration = None
|
||||
# When max_count == current_count, hold request can no longer be used.
|
||||
self.max_count = 1
|
||||
self.current_count = 0
|
||||
|
||||
def __str__(self):
|
||||
return "<HoldRequest %s: tenant=%s project=%s job=%s ref_filter=%s>" \
|
||||
% (self.id, self.tenant, self.project, self.job, self.ref_filter)
|
||||
|
||||
@staticmethod
|
||||
def fromDict(data):
|
||||
'''
|
||||
Return a new object from the given data dictionary.
|
||||
'''
|
||||
obj = HoldRequest()
|
||||
obj.tenant = data.get('tenant')
|
||||
obj.project = data.get('project')
|
||||
obj.job = data.get('job')
|
||||
obj.ref_filter = data.get('ref_filter')
|
||||
obj.max_count = data.get('max_count')
|
||||
obj.current_count = data.get('current_count')
|
||||
obj.reason = data.get('reason')
|
||||
obj.node_expiration = data.get('node_expiration')
|
||||
return obj
|
||||
|
||||
def toDict(self):
|
||||
'''
|
||||
Return a dictionary representation of the object.
|
||||
'''
|
||||
d = dict()
|
||||
d['id'] = self.id
|
||||
d['tenant'] = self.tenant
|
||||
d['project'] = self.project
|
||||
d['job'] = self.job
|
||||
d['ref_filter'] = self.ref_filter
|
||||
d['max_count'] = self.max_count
|
||||
d['current_count'] = self.current_count
|
||||
d['reason'] = self.reason
|
||||
d['node_expiration'] = self.node_expiration
|
||||
return d
|
||||
|
||||
def serialize(self):
|
||||
'''
|
||||
Return a representation of the object as a string.
|
||||
|
||||
Used for storing the object data in ZooKeeper.
|
||||
'''
|
||||
return json.dumps(self.toDict()).encode('utf8')
|
||||
|
||||
|
||||
# AuthZ models
|
||||
|
||||
class AuthZRule(object):
|
||||
|
|
|
@ -168,38 +168,46 @@ class Nodepool(object):
|
|||
except Exception:
|
||||
log.exception("Unable to unlock node request %s", request)
|
||||
|
||||
def holdNodeSet(self, nodeset, autohold_key):
|
||||
def holdNodeSet(self, nodeset, request):
|
||||
'''
|
||||
Perform a hold on the given set of nodes.
|
||||
|
||||
:param NodeSet nodeset: The object containing the set of nodes to hold.
|
||||
:param set autohold_key: A set with the tenant/project/job names
|
||||
associated with the given NodeSet.
|
||||
:param HoldRequest request: Hold request associated with the NodeSet
|
||||
'''
|
||||
self.log.info("Holding nodeset %s" % (nodeset,))
|
||||
(hold_iterations,
|
||||
reason,
|
||||
node_hold_expiration) = self.sched.autohold_requests[autohold_key]
|
||||
nodes = nodeset.getNodes()
|
||||
|
||||
for node in nodes:
|
||||
if node.lock is None:
|
||||
raise Exception("Node %s is not locked" % (node,))
|
||||
node.state = model.STATE_HOLD
|
||||
node.hold_job = " ".join(autohold_key)
|
||||
node.comment = reason
|
||||
if node_hold_expiration:
|
||||
node.hold_expiration = node_hold_expiration
|
||||
node.hold_job = " ".join([request.tenant,
|
||||
request.project,
|
||||
request.job,
|
||||
request.ref_filter])
|
||||
node.comment = request.reason
|
||||
if request.node_expiration:
|
||||
node.hold_expiration = request.node_expiration
|
||||
self.sched.zk.storeNode(node)
|
||||
|
||||
# We remove the autohold when the number of nodes in hold
|
||||
# is equal to or greater than (run iteration count can be
|
||||
# altered) the number of nodes used in a single job run
|
||||
# times the number of run iterations requested.
|
||||
nodes_in_hold = self.sched.zk.heldNodeCount(autohold_key)
|
||||
if nodes_in_hold >= len(nodes) * hold_iterations:
|
||||
self.log.debug("Removing autohold for %s", autohold_key)
|
||||
del self.sched.autohold_requests[autohold_key]
|
||||
request.current_count += 1
|
||||
|
||||
# Give ourselves a few seconds to try to obtain the lock rather than
|
||||
# immediately give up.
|
||||
self.sched.zk.lockHoldRequest(request, timeout=5)
|
||||
|
||||
try:
|
||||
self.sched.zk.storeHoldRequest(request)
|
||||
except Exception:
|
||||
# If we fail to update the request count, we won't consider it
|
||||
# a real autohold error by passing the exception up. It will
|
||||
# just get used more than the original count specified.
|
||||
self.log.exception("Unable to update hold request %s:", request)
|
||||
finally:
|
||||
# Although any exceptions thrown here are handled higher up in
|
||||
# _doBuildCompletedEvent, we always want to try to unlock it.
|
||||
self.sched.zk.unlockHoldRequest(request)
|
||||
|
||||
def useNodeSet(self, nodeset, build_set=None, event=None):
|
||||
self.log.info("Setting nodeset %s in use" % (nodeset,))
|
||||
|
|
|
@ -62,6 +62,10 @@ class RPCClient(object):
|
|||
'node_hold_expiration': node_hold_expiration}
|
||||
return not self.submitJob('zuul:autohold', data).failure
|
||||
|
||||
def autohold_delete(self, request_id):
|
||||
data = {'request_id': request_id}
|
||||
return not self.submitJob('zuul:autohold_delete', data).failure
|
||||
|
||||
# todo allow filtering per tenant, like in the REST API
|
||||
def autohold_list(self, *args, **kwargs):
|
||||
data = {}
|
||||
|
|
|
@ -34,6 +34,7 @@ class RPCListener(object):
|
|||
self.jobs = {}
|
||||
functions = [
|
||||
'autohold',
|
||||
'autohold_delete',
|
||||
'autohold_list',
|
||||
'allowed_labels_get',
|
||||
'dequeue',
|
||||
|
@ -92,16 +93,19 @@ class RPCListener(object):
|
|||
return
|
||||
job.sendWorkComplete()
|
||||
|
||||
def handle_autohold_delete(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
request_id = args['request_id']
|
||||
try:
|
||||
self.sched.autohold_delete(request_id)
|
||||
except Exception as e:
|
||||
job.sendWorkException(str(e).encode('utf8'))
|
||||
return
|
||||
job.sendWorkComplete()
|
||||
|
||||
def handle_autohold_list(self, job):
|
||||
req = {}
|
||||
|
||||
# The json.dumps() call cannot handle dict keys that are not strings
|
||||
# so we convert our key to a CSV string that the caller can parse.
|
||||
for key, value in self.sched.autohold_requests.items():
|
||||
new_key = ','.join(key)
|
||||
req[new_key] = value
|
||||
|
||||
job.sendWorkComplete(json.dumps(req))
|
||||
data = self.sched.autohold_list()
|
||||
job.sendWorkComplete(json.dumps(data))
|
||||
|
||||
def handle_autohold(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
|
|
|
@ -40,7 +40,7 @@ from zuul.lib.logutil import get_annotated_logger
|
|||
from zuul.lib.statsd import get_statsd
|
||||
import zuul.lib.queue
|
||||
import zuul.lib.repl
|
||||
from zuul.model import Build
|
||||
from zuul.model import Build, HoldRequest
|
||||
|
||||
COMMANDS = ['full-reconfigure', 'stop', 'repl', 'norepl']
|
||||
|
||||
|
@ -326,7 +326,6 @@ class Scheduler(threading.Thread):
|
|||
self.zuul_version = zuul_version.release_string
|
||||
self.last_reconfigured = None
|
||||
self.tenant_last_reconfigured = {}
|
||||
self.autohold_requests = {}
|
||||
self.use_relative_priority = False
|
||||
if self.config.has_option('scheduler', 'relative_priority'):
|
||||
if self.config.getboolean('scheduler', 'relative_priority'):
|
||||
|
@ -555,12 +554,57 @@ class Scheduler(threading.Thread):
|
|||
def autohold(self, tenant_name, project_name, job_name, ref_filter,
|
||||
reason, count, node_hold_expiration):
|
||||
key = (tenant_name, project_name, job_name, ref_filter)
|
||||
if count == 0 and key in self.autohold_requests:
|
||||
self.log.debug("Removing autohold for %s", key)
|
||||
del self.autohold_requests[key]
|
||||
else:
|
||||
self.log.debug("Autohold requested for %s", key)
|
||||
self.autohold_requests[key] = (count, reason, node_hold_expiration)
|
||||
self.log.debug("Autohold requested for %s", key)
|
||||
|
||||
request = HoldRequest()
|
||||
request.tenant = tenant_name
|
||||
request.project = project_name
|
||||
request.job = job_name
|
||||
request.ref_filter = ref_filter
|
||||
request.reason = reason
|
||||
request.max_count = count
|
||||
request.node_expiration = node_hold_expiration
|
||||
|
||||
# No need to lock it since we are creating a new one.
|
||||
self.zk.storeHoldRequest(request)
|
||||
|
||||
def autohold_list(self):
|
||||
'''
|
||||
Return current hold requests as a list of dicts.
|
||||
'''
|
||||
data = []
|
||||
for request_id in self.zk.getHoldRequests():
|
||||
request = self.zk.getHoldRequest(request_id)
|
||||
if not request:
|
||||
continue
|
||||
data.append(request.toDict())
|
||||
return data
|
||||
|
||||
def autohold_delete(self, hold_request_id):
|
||||
'''
|
||||
Delete an autohold request.
|
||||
|
||||
:param str hold_request_id: The unique ID of the request to delete.
|
||||
'''
|
||||
try:
|
||||
hold_request = self.zk.getHoldRequest(hold_request_id)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error retrieving autohold ID %s:", hold_request_id)
|
||||
|
||||
if not hold_request:
|
||||
self.log.info("Ignored request to remove invalid autohold ID %s",
|
||||
hold_request_id)
|
||||
return
|
||||
|
||||
# (TODO): Release any nodes held for this request here
|
||||
|
||||
self.log.debug("Removing autohold %s", hold_request)
|
||||
try:
|
||||
self.zk.deleteHoldRequest(hold_request)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error removing autohold request %s:", hold_request)
|
||||
|
||||
def promote(self, tenant_name, pipeline_name, change_ids):
|
||||
event = PromoteEvent(tenant_name, pipeline_name, change_ids)
|
||||
|
@ -1232,7 +1276,7 @@ class Scheduler(threading.Thread):
|
|||
return
|
||||
pipeline.manager.onBuildPaused(event.build)
|
||||
|
||||
def _getAutoholdRequestKey(self, build):
|
||||
def _getAutoholdRequest(self, build):
|
||||
change = build.build_set.item.change
|
||||
|
||||
autohold_key_base = (build.pipeline.tenant.name,
|
||||
|
@ -1254,16 +1298,6 @@ class Scheduler(threading.Thread):
|
|||
CHANGE = 2
|
||||
REF = 3
|
||||
|
||||
def autohold_key_base_issubset(base, request_key):
|
||||
"""check whether the given key is a subset of the build key"""
|
||||
index = 0
|
||||
base_len = len(base)
|
||||
while index < base_len:
|
||||
if base[index] != request_key[index]:
|
||||
return False
|
||||
index += 1
|
||||
return True
|
||||
|
||||
# Do a partial match of the autohold key against all autohold
|
||||
# requests, ignoring the last element of the key (ref filter),
|
||||
# and finally do a regex match between ref filter from
|
||||
|
@ -1271,13 +1305,23 @@ class Scheduler(threading.Thread):
|
|||
# if it matches. Lastly, make sure that we match the most
|
||||
# specific autohold request by comparing "scopes"
|
||||
# of requests - the most specific is selected.
|
||||
autohold_key = None
|
||||
autohold = None
|
||||
scope = Scope.NONE
|
||||
self.log.debug("Checking build autohold key %s", autohold_key_base)
|
||||
for request in self.autohold_requests:
|
||||
ref_filter = request[-1]
|
||||
if not autohold_key_base_issubset(autohold_key_base, request) \
|
||||
or not re.match(ref_filter, change.ref):
|
||||
for request_id in self.zk.getHoldRequests():
|
||||
request = self.zk.getHoldRequest(request_id)
|
||||
if not request:
|
||||
continue
|
||||
ref_filter = request.ref_filter
|
||||
|
||||
if request.current_count >= request.max_count:
|
||||
# This request has been used the max number of times
|
||||
continue
|
||||
elif not (request.tenant == autohold_key_base[0] and
|
||||
request.project == autohold_key_base[1] and
|
||||
request.job == autohold_key_base[2]):
|
||||
continue
|
||||
elif not re.match(ref_filter, change.ref):
|
||||
continue
|
||||
|
||||
if ref_filter == ".*":
|
||||
|
@ -1291,9 +1335,9 @@ class Scheduler(threading.Thread):
|
|||
autohold_key_base, candidate_scope)
|
||||
if candidate_scope > scope:
|
||||
scope = candidate_scope
|
||||
autohold_key = request
|
||||
autohold = request
|
||||
|
||||
return autohold_key
|
||||
return autohold
|
||||
|
||||
def _processAutohold(self, build):
|
||||
# We explicitly only want to hold nodes for jobs if they have
|
||||
|
@ -1302,10 +1346,10 @@ class Scheduler(threading.Thread):
|
|||
if build.result not in hold_list:
|
||||
return
|
||||
|
||||
autohold_key = self._getAutoholdRequestKey(build)
|
||||
self.log.debug("Got autohold key %s", autohold_key)
|
||||
if autohold_key is not None:
|
||||
self.nodepool.holdNodeSet(build.nodeset, autohold_key)
|
||||
request = self._getAutoholdRequest(build)
|
||||
self.log.debug("Got autohold %s", request)
|
||||
if request is not None:
|
||||
self.nodepool.holdNodeSet(build.nodeset, request)
|
||||
|
||||
def _doBuildCompletedEvent(self, event):
|
||||
build = event.build
|
||||
|
|
|
@ -417,19 +417,19 @@ class ZuulWebAPI(object):
|
|||
else:
|
||||
payload = json.loads(job.data[0])
|
||||
result = []
|
||||
for key in payload:
|
||||
_tenant, _project, job, ref_filter = key.split(',')
|
||||
count, reason, hold_expiration = payload[key]
|
||||
if tenant == _tenant:
|
||||
if project is None or _project.endswith(project):
|
||||
for request in payload:
|
||||
if tenant == request['tenant']:
|
||||
if (project is None or
|
||||
request['project'].endswith(project)):
|
||||
result.append(
|
||||
{'tenant': _tenant,
|
||||
'project': _project,
|
||||
'job': job,
|
||||
'ref_filter': ref_filter,
|
||||
'count': count,
|
||||
'reason': reason,
|
||||
'node_hold_expiration': hold_expiration})
|
||||
{'tenant': request['tenant'],
|
||||
'project': request['project'],
|
||||
'job': request['job'],
|
||||
'ref_filter': request['ref_filter'],
|
||||
'count': request['max_count'],
|
||||
'reason': request['reason'],
|
||||
'node_hold_expiration': request['node_expiration']
|
||||
})
|
||||
return result
|
||||
|
||||
@cherrypy.expose
|
||||
|
|
100
zuul/zk.py
100
zuul/zk.py
|
@ -43,6 +43,7 @@ class ZooKeeper(object):
|
|||
REQUEST_ROOT = '/nodepool/requests'
|
||||
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
|
||||
NODE_ROOT = '/nodepool/nodes'
|
||||
HOLD_REQUEST_ROOT = '/zuul/hold-requests'
|
||||
|
||||
# Log zookeeper retry every 10 seconds
|
||||
retry_log_rate = 10
|
||||
|
@ -459,6 +460,105 @@ class ZooKeeper(object):
|
|||
if node:
|
||||
yield node
|
||||
|
||||
def getHoldRequests(self):
|
||||
'''
|
||||
Get the current list of all hold requests.
|
||||
'''
|
||||
try:
|
||||
return self.client.get_children(self.HOLD_REQUEST_ROOT)
|
||||
except kze.NoNodeError:
|
||||
return []
|
||||
|
||||
def getHoldRequest(self, hold_request_id):
|
||||
path = self.HOLD_REQUEST_ROOT + "/" + hold_request_id
|
||||
try:
|
||||
data, stat = self.client.get(path)
|
||||
except kze.NoNodeError:
|
||||
return None
|
||||
if not data:
|
||||
return None
|
||||
|
||||
obj = zuul.model.HoldRequest.fromDict(self._strToDict(data))
|
||||
obj.id = hold_request_id
|
||||
return obj
|
||||
|
||||
def storeHoldRequest(self, hold_request):
|
||||
'''
|
||||
Create or update a hold request.
|
||||
|
||||
If this is a new request with no value for the `id` attribute of the
|
||||
passed in request, then `id` will be set with the unique request
|
||||
identifier after successful creation.
|
||||
|
||||
:param HoldRequest hold_request: Object representing the hold request.
|
||||
'''
|
||||
if hold_request.id is None:
|
||||
path = self.client.create(
|
||||
self.HOLD_REQUEST_ROOT + "/",
|
||||
value=hold_request.serialize(),
|
||||
sequence=True,
|
||||
makepath=True)
|
||||
hold_request.id = path.split('/')[-1]
|
||||
else:
|
||||
path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id
|
||||
self.client.set(path, hold_request.serialize())
|
||||
|
||||
def deleteHoldRequest(self, hold_request):
|
||||
'''
|
||||
Delete a hold request.
|
||||
|
||||
:param HoldRequest hold_request: Object representing the hold request.
|
||||
'''
|
||||
path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id
|
||||
try:
|
||||
self.client.delete(path, recursive=True)
|
||||
except kze.NoNodeError:
|
||||
pass
|
||||
|
||||
def lockHoldRequest(self, request, blocking=True, timeout=None):
|
||||
'''
|
||||
Lock a node request.
|
||||
|
||||
This will set the `lock` attribute of the request object when the
|
||||
lock is successfully acquired.
|
||||
|
||||
:param HoldRequest request: The hold request to lock.
|
||||
'''
|
||||
if not request.id:
|
||||
raise LockException(
|
||||
"Hold request without an ID cannot be locked: %s" % request)
|
||||
|
||||
path = "%s/%s/lock" % (self.HOLD_REQUEST_ROOT, request.id)
|
||||
try:
|
||||
lock = Lock(self.client, path)
|
||||
have_lock = lock.acquire(blocking, timeout)
|
||||
except kze.LockTimeout:
|
||||
raise LockException(
|
||||
"Timeout trying to acquire lock %s" % path)
|
||||
|
||||
# If we aren't blocking, it's possible we didn't get the lock
|
||||
# because someone else has it.
|
||||
if not have_lock:
|
||||
raise LockException("Did not get lock on %s" % path)
|
||||
|
||||
request.lock = lock
|
||||
|
||||
def unlockHoldRequest(self, request):
|
||||
'''
|
||||
Unlock a hold request.
|
||||
|
||||
The request must already have been locked.
|
||||
|
||||
:param HoldRequest request: The request to unlock.
|
||||
|
||||
:raises: ZKLockException if the request is not currently locked.
|
||||
'''
|
||||
if request.lock is None:
|
||||
raise LockException(
|
||||
"Request %s does not hold a lock" % request)
|
||||
request.lock.release()
|
||||
request.lock = None
|
||||
|
||||
|
||||
class Launcher():
|
||||
'''
|
||||
|
|
Loading…
Reference in New Issue