Provide statsd client to Nodepool and make scheduler optional

To lock/unlock the nodes directly in the executor server, we have to
make the Nodepool API work without a scheduler instance.

To keep the stats emitting intact, we provide a statsd client directly
to the Nodepool instance.

This leaves only one place where the scheduler is used in the Nodepool
class, which is the onNodesProvisioned() callback.
This callback won't be necessary anymore when the nodes are locked on
the executor and thus this function call and the scheduler parameter
itself can be removed.

Change-Id: I3f3e4bfff08e244f68a9be7c6a4efcc194a23332
This commit is contained in:
Felix Edel 2021-04-28 15:02:25 +02:00
parent b9a6190a45
commit ba7f81be2d
4 changed files with 52 additions and 43 deletions

View File

@ -39,7 +39,8 @@ class TestNodepoolIntegration(BaseTestCase):
self.provisioned_requests = [] self.provisioned_requests = []
# This class implements the scheduler methods zuul.nodepool # This class implements the scheduler methods zuul.nodepool
# needs, so we pass 'self' as the scheduler. # needs, so we pass 'self' as the scheduler.
self.nodepool = zuul.nodepool.Nodepool(self) self.nodepool = zuul.nodepool.Nodepool(
self.zk_client, self.hostname, self.statsd, self)
def waitForRequests(self): def waitForRequests(self):
# Wait until all requests are complete. # Wait until all requests are complete.

View File

@ -46,7 +46,8 @@ class TestNodepool(BaseTestCase):
self.provisioned_requests = [] self.provisioned_requests = []
# This class implements the scheduler methods zuul.nodepool # This class implements the scheduler methods zuul.nodepool
# needs, so we pass 'self' as the scheduler. # needs, so we pass 'self' as the scheduler.
self.nodepool = zuul.nodepool.Nodepool(self) self.nodepool = zuul.nodepool.Nodepool(
self.zk_client, self.hostname, self.statsd, self)
self.fake_nodepool = FakeNodepool(self.zk_chroot_fixture) self.fake_nodepool = FakeNodepool(self.zk_chroot_fixture)
self.addCleanup(self.fake_nodepool.stop) self.addCleanup(self.fake_nodepool.stop)

View File

@ -17,6 +17,7 @@ from collections import defaultdict
from zuul import model from zuul import model
from zuul.lib.logutil import get_annotated_logger from zuul.lib.logutil import get_annotated_logger
from zuul.zk.exceptions import LockException from zuul.zk.exceptions import LockException
from zuul.zk.nodepool import ZooKeeperNodepool
def add_resources(target, source): def add_resources(target, source):
@ -32,9 +33,16 @@ def subtract_resources(target, source):
class Nodepool(object): class Nodepool(object):
log = logging.getLogger('zuul.nodepool') log = logging.getLogger('zuul.nodepool')
def __init__(self, scheduler): def __init__(self, zk_client, hostname, statsd, scheduler=None):
self.requests = {} self.hostname = hostname
self.statsd = statsd
# TODO (felix): Remove the scheduler parameter once the nodes are
# locked on the executor side.
self.sched = scheduler self.sched = scheduler
self.zk_nodepool = ZooKeeperNodepool(zk_client)
self.requests = {}
self.current_resources_by_tenant = {} self.current_resources_by_tenant = {}
self.current_resources_by_project = {} self.current_resources_by_project = {}
@ -47,10 +55,9 @@ class Nodepool(object):
# timer zuul.nodepool.requests.(fulfilled|failed).<label> # timer zuul.nodepool.requests.(fulfilled|failed).<label>
# timer zuul.nodepool.requests.(fulfilled|failed).<size> # timer zuul.nodepool.requests.(fulfilled|failed).<size>
# gauge zuul.nodepool.current_requests # gauge zuul.nodepool.current_requests
if not self.sched.statsd: if not self.statsd:
return return
statsd = self.sched.statsd pipe = self.statsd.pipeline()
pipe = statsd.pipeline()
state = request.state state = request.state
dt = None dt = None
@ -75,55 +82,51 @@ class Nodepool(object):
pipe.send() pipe.send()
def emitStatsResources(self): def emitStatsResources(self):
if not self.sched.statsd:
return
statsd = self.sched.statsd
for tenant, resources in self.current_resources_by_tenant.items(): for tenant, resources in self.current_resources_by_tenant.items():
for resource, value in resources.items(): for resource, value in resources.items():
key = 'zuul.nodepool.resources.tenant.' \ key = 'zuul.nodepool.resources.tenant.' \
'{tenant}.{resource}' '{tenant}.{resource}'
statsd.gauge(key, value, tenant=tenant, resource=resource) self.statsd.gauge(key, value, tenant=tenant, resource=resource)
for project, resources in self.current_resources_by_project.items(): for project, resources in self.current_resources_by_project.items():
for resource, value in resources.items(): for resource, value in resources.items():
key = 'zuul.nodepool.resources.project.' \ key = 'zuul.nodepool.resources.project.' \
'{project}.{resource}' '{project}.{resource}'
statsd.gauge(key, value, project=project, resource=resource) self.statsd.gauge(
key, value, project=project, resource=resource)
def emitStatsResourceCounters(self, tenant, project, resources, duration): def emitStatsResourceCounters(self, tenant, project, resources, duration):
if not self.sched.statsd:
return
statsd = self.sched.statsd
for resource, value in resources.items(): for resource, value in resources.items():
key = 'zuul.nodepool.resources.tenant.{tenant}.{resource}' key = 'zuul.nodepool.resources.tenant.{tenant}.{resource}'
statsd.incr(key, value * duration, self.statsd.incr(
tenant=tenant, resource=resource) key, value * duration, tenant=tenant, resource=resource)
for resource, value in resources.items(): for resource, value in resources.items():
key = 'zuul.nodepool.resources.project.' \ key = 'zuul.nodepool.resources.project.' \
'{project}.{resource}' '{project}.{resource}'
statsd.incr(key, value * duration, self.statsd.incr(
project=project, resource=resource) key, value * duration, project=project, resource=resource)
def requestNodes(self, build_set, job, relative_priority, event=None): def requestNodes(self, build_set, job, relative_priority, event=None):
log = get_annotated_logger(self.log, event) log = get_annotated_logger(self.log, event)
# Create a copy of the nodeset to represent the actual nodes # Create a copy of the nodeset to represent the actual nodes
# returned by nodepool. # returned by nodepool.
nodeset = job.nodeset.copy() nodeset = job.nodeset.copy()
req = model.NodeRequest(self.sched.hostname, build_set, job, req = model.NodeRequest(self.hostname, build_set, job,
nodeset, relative_priority, event=event) nodeset, relative_priority, event=event)
self.requests[req.uid] = req self.requests[req.uid] = req
if nodeset.nodes: if nodeset.nodes:
self.sched.zk_nodepool.submitNodeRequest(req, self.zk_nodepool.submitNodeRequest(req, self._updateNodeRequest)
self._updateNodeRequest)
# Logged after submission so that we have the request id # Logged after submission so that we have the request id
log.info("Submitted node request %s", req) log.info("Submitted node request %s", req)
self.emitStats(req) self.emitStats(req)
else: else:
log.info("Fulfilling empty node request %s", req) log.info("Fulfilling empty node request %s", req)
req.state = model.STATE_FULFILLED req.state = model.STATE_FULFILLED
self.sched.onNodesProvisioned(req) if self.sched is not None:
# TODO (felix): Remove this call once the nodes are locked on
# the executor side.
self.sched.onNodesProvisioned(req)
del self.requests[req.uid] del self.requests[req.uid]
return req return req
@ -133,7 +136,7 @@ class Nodepool(object):
if request.uid in self.requests: if request.uid in self.requests:
request.canceled = True request.canceled = True
try: try:
self.sched.zk_nodepool.deleteNodeRequest(request) self.zk_nodepool.deleteNodeRequest(request)
except Exception: except Exception:
log.exception("Error deleting node request:") log.exception("Error deleting node request:")
@ -150,7 +153,7 @@ class Nodepool(object):
if relative_priority is None: if relative_priority is None:
return return
try: try:
self.sched.zk_nodepool.lockNodeRequest(request, blocking=False) self.zk_nodepool.lockNodeRequest(request, blocking=False)
except LockException: except LockException:
# It may be locked by nodepool, which is fine. # It may be locked by nodepool, which is fine.
log.debug("Unable to revise locked node request %s", request) log.debug("Unable to revise locked node request %s", request)
@ -158,7 +161,7 @@ class Nodepool(object):
try: try:
old_priority = request.relative_priority old_priority = request.relative_priority
request.relative_priority = relative_priority request.relative_priority = relative_priority
self.sched.zk_nodepool.storeNodeRequest(request) self.zk_nodepool.storeNodeRequest(request)
log.debug("Revised relative priority of " log.debug("Revised relative priority of "
"node request %s from %s to %s", "node request %s from %s to %s",
request, old_priority, relative_priority) request, old_priority, relative_priority)
@ -166,7 +169,7 @@ class Nodepool(object):
log.exception("Unable to update node request %s", request) log.exception("Unable to update node request %s", request)
finally: finally:
try: try:
self.sched.zk_nodepool.unlockNodeRequest(request) self.zk_nodepool.unlockNodeRequest(request)
except Exception: except Exception:
log.exception("Unable to unlock node request %s", request) log.exception("Unable to unlock node request %s", request)
@ -191,7 +194,7 @@ class Nodepool(object):
node.comment = request.reason node.comment = request.reason
if request.node_expiration: if request.node_expiration:
node.hold_expiration = request.node_expiration node.hold_expiration = request.node_expiration
self.sched.zk_nodepool.storeNode(node) self.zk_nodepool.storeNode(node)
request.nodes.append(dict( request.nodes.append(dict(
build=build.uuid, build=build.uuid,
@ -206,10 +209,10 @@ class Nodepool(object):
# Give ourselves a few seconds to try to obtain the lock rather than # Give ourselves a few seconds to try to obtain the lock rather than
# immediately give up. # immediately give up.
self.sched.zk_nodepool.lockHoldRequest(request, timeout=5) self.zk_nodepool.lockHoldRequest(request, timeout=5)
try: try:
self.sched.zk_nodepool.storeHoldRequest(request) self.zk_nodepool.storeHoldRequest(request)
except Exception: except Exception:
# If we fail to update the request count, we won't consider it # If we fail to update the request count, we won't consider it
# a real autohold error by passing the exception up. It will # a real autohold error by passing the exception up. It will
@ -220,7 +223,7 @@ class Nodepool(object):
finally: finally:
# Although any exceptions thrown here are handled higher up in # Although any exceptions thrown here are handled higher up in
# _doBuildCompletedEvent, we always want to try to unlock it. # _doBuildCompletedEvent, we always want to try to unlock it.
self.sched.zk_nodepool.unlockHoldRequest(request) self.zk_nodepool.unlockHoldRequest(request)
def useNodeSet(self, nodeset, build_set=None, event=None): def useNodeSet(self, nodeset, build_set=None, event=None):
self.log.info("Setting nodeset %s in use" % (nodeset,)) self.log.info("Setting nodeset %s in use" % (nodeset,))
@ -229,7 +232,7 @@ class Nodepool(object):
if node.lock is None: if node.lock is None:
raise Exception("Node %s is not locked" % (node,)) raise Exception("Node %s is not locked" % (node,))
node.state = model.STATE_IN_USE node.state = model.STATE_IN_USE
self.sched.zk_nodepool.storeNode(node) self.zk_nodepool.storeNode(node)
if node.resources: if node.resources:
add_resources(resources, node.resources) add_resources(resources, node.resources)
if build_set and resources: if build_set and resources:
@ -276,7 +279,7 @@ class Nodepool(object):
if node.resources: if node.resources:
add_resources(resources, node.resources) add_resources(resources, node.resources)
node.state = model.STATE_USED node.state = model.STATE_USED
self.sched.zk_nodepool.storeNode(node) self.zk_nodepool.storeNode(node)
except Exception: except Exception:
log.exception("Exception storing node %s " log.exception("Exception storing node %s "
"while unlocking:", node) "while unlocking:", node)
@ -304,7 +307,7 @@ class Nodepool(object):
def _unlockNodes(self, nodes): def _unlockNodes(self, nodes):
for node in nodes: for node in nodes:
try: try:
self.sched.zk_nodepool.unlockNode(node) self.zk_nodepool.unlockNode(node)
except Exception: except Exception:
self.log.exception("Error unlocking node:") self.log.exception("Error unlocking node:")
@ -322,7 +325,7 @@ class Nodepool(object):
raise Exception("Node %s allocated to %s, not %s" % raise Exception("Node %s allocated to %s, not %s" %
(node.id, node.allocated_to, request_id)) (node.id, node.allocated_to, request_id))
self.log.debug("Locking node %s" % (node,)) self.log.debug("Locking node %s" % (node,))
self.sched.zk_nodepool.lockNode(node, timeout=30) self.zk_nodepool.lockNode(node, timeout=30)
locked_nodes.append(node) locked_nodes.append(node)
except Exception: except Exception:
self.log.exception("Error locking nodes:") self.log.exception("Error locking nodes:")
@ -348,15 +351,18 @@ class Nodepool(object):
if deleted: if deleted:
log.debug("Resubmitting lost node request %s", request) log.debug("Resubmitting lost node request %s", request)
request.id = None request.id = None
self.sched.zk_nodepool.submitNodeRequest(request, self.zk_nodepool.submitNodeRequest(
self._updateNodeRequest) request, self._updateNodeRequest)
# Stop watching this request node # Stop watching this request node
return False return False
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED): elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
log.info("Node request %s %s", request, request.state) log.info("Node request %s %s", request, request.state)
# Give our results to the scheduler. # Give our results to the scheduler.
self.sched.onNodesProvisioned(request) if self.sched is not None:
# TODO (felix): Remove this call once the nodes are locked on
# the executor side.
self.sched.onNodesProvisioned(request)
del self.requests[request.uid] del self.requests[request.uid]
self.emitStats(request) self.emitStats(request)
@ -398,13 +404,13 @@ class Nodepool(object):
# processing it. Nodepool will automatically reallocate the assigned # processing it. Nodepool will automatically reallocate the assigned
# nodes in that situation. # nodes in that situation.
try: try:
if not self.sched.zk_nodepool.nodeRequestExists(request): if not self.zk_nodepool.nodeRequestExists(request):
log.info("Request %s no longer exists, resubmitting", log.info("Request %s no longer exists, resubmitting",
request.id) request.id)
request.id = None request.id = None
request.state = model.STATE_REQUESTED request.state = model.STATE_REQUESTED
self.requests[request.uid] = request self.requests[request.uid] = request
self.sched.zk_nodepool.submitNodeRequest( self.zk_nodepool.submitNodeRequest(
request, self._updateNodeRequest) request, self._updateNodeRequest)
return False return False
except Exception: except Exception:
@ -431,7 +437,7 @@ class Nodepool(object):
# succeeded, delete the request. # succeeded, delete the request.
log.debug("Deleting node request %s", request) log.debug("Deleting node request %s", request)
try: try:
self.sched.zk_nodepool.deleteNodeRequest(request) self.zk_nodepool.deleteNodeRequest(request)
except Exception: except Exception:
log.exception("Error deleting node request:") log.exception("Error deleting node request:")
request.failed = True request.failed = True

View File

@ -219,7 +219,8 @@ class Scheduler(threading.Thread):
if not testonly: if not testonly:
self.executor = ExecutorClient(self.config, self) self.executor = ExecutorClient(self.config, self)
self.merger = self._merger_client_class(self.config, self) self.merger = self._merger_client_class(self.config, self)
self.nodepool = nodepool.Nodepool(self) self.nodepool = nodepool.Nodepool(
self.zk_client, self.hostname, self.statsd, self)
def start(self): def start(self):
super(Scheduler, self).start() super(Scheduler, self).start()