# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import logging import threading import time from collections import defaultdict from zuul import model from zuul.lib import tracing from zuul.lib.logutil import get_annotated_logger from zuul.zk.event_queues import ( PipelineResultEventQueue, NodepoolEventElection ) from zuul.zk.exceptions import LockException from zuul.zk.nodepool import NodeRequestEvent, ZooKeeperNodepool class Nodepool(object): log = logging.getLogger('zuul.nodepool') # The kind of resources we report stats on. We need a complete # list in order to report 0 level gauges. resource_types = ('ram', 'cores', 'instances') def __init__(self, zk_client, system_id, statsd, scheduler=False): self._stopped = False self.system_id = system_id self.statsd = statsd self.election_won = False if scheduler: # Only enable the node request cache/callback for the scheduler. self.stop_watcher_event = threading.Event() self.zk_nodepool = ZooKeeperNodepool( zk_client, enable_node_request_cache=True, node_request_event_callback=self._handleNodeRequestEvent, connection_suspended_callback=self.stop_watcher_event, enable_node_cache=True) self.election = NodepoolEventElection(zk_client) self.event_thread = threading.Thread(target=self.runEventElection) self.event_thread.daemon = True self.event_thread.start() else: self.stop_watcher_event = None self.zk_nodepool = ZooKeeperNodepool(zk_client) self.election = None self.event_thread = None self.pipeline_result_events = PipelineResultEventQueue.createRegistry( zk_client ) def addResources(self, target, source): for key, value in source.items(): if key in self.resource_types: target[key] += value def runEventElection(self): while not self._stopped: try: self.log.debug("Running nodepool watcher election") self.election.run(self._electionWon) except Exception: self.log.exception("Error in nodepool watcher:") def stop(self): self.log.debug("Stopping") self._stopped = True if self.election: self.election.cancel() self.stop_watcher_event.set() self.event_thread.join() # Delete the election to avoid a FD leak in tests. del self.election def _sendNodesProvisionedEvent(self, request): tracing.endSavedSpan(request.span_info, attributes={ "request_id": request.id, "state": request.state, }) tenant_name = request.tenant_name pipeline_name = request.pipeline_name event = model.NodesProvisionedEvent(request.id, request.build_set_uuid) self.pipeline_result_events[tenant_name][pipeline_name].put(event) def _electionWon(self): self.log.info("Watching nodepool requests") # Iterate over every completed request in case we are starting # up or missed something in the transition. self.election_won = True try: for rid in self.zk_nodepool.getNodeRequests(): request = self.zk_nodepool.getNodeRequest(rid, cached=True) if request.requestor != self.system_id: continue if (request.state in {model.STATE_FULFILLED, model.STATE_FAILED}): self._sendNodesProvisionedEvent(request) # Now resume normal event processing. self.stop_watcher_event.wait() finally: self.stop_watcher_event.clear() self.election_won = False def _handleNodeRequestEvent(self, request, event): log = get_annotated_logger(self.log, event=request.event_id) if request.requestor != self.system_id: return log.debug("Node request %s %s", request, request.state) if event == NodeRequestEvent.COMPLETED: try: if self.election_won: if self.election.is_still_valid(): self.emitStats(request) self._sendNodesProvisionedEvent(request) else: self.stop_watcher_event.set() except Exception: # If there are any errors moving the event, re-run the # election. if self.stop_watcher_event is not None: self.stop_watcher_event.set() raise def emitStats(self, request): # Implements the following : # counter zuul.nodepool.requests..total # counter zuul.nodepool.requests..label.