# Copyright 2021 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import math import logging import json import time from nodepool.driver.utils import QuotaInformation, RateLimiter from nodepool.driver import statemachine from nodepool.zk import zookeeper as zk """ This driver behaves like a static driver execpt that the backing nodes come from other Nodepool drivers. The intent is that users will request nodes from this driver, and if any nodes already exist, the request will be satisfied with those first. If not, then this driver will request a new node from another driver in Nodepool, and then add that to this driver's pool of available nodes. Each backing node may supply one or more nodes to the end user. For example, a user might request 3 nodes from this driver. Having none available, this driver would then request a single node from an AWS driver. Once that node is available, this driver might configure 8 terminal nodes all backed by the single AWS node, and fulfill the request by allocating 3 of them. If a further request arrived for 5 nodes, they would be fulfilled from the remaining 5 slots. Once all 8 nodes have been returned, this driver will release the underlying AWS node and the AWS driver will reclaim it. To accomplish this, the process is roughly: * Upon request, if insufficient nodes available, request new backing node(s). The requestor should be "NodePool:metastatic:{providername}". * Lock backing nodes with a non-ephemeral lock (so that they persist even if this launcher is stopeed) and use "NodePool:metastatic:{providername}" as identifier. * Update the Node.user_data field in ZK to include "NodePool:metastatic:{providername}" along with label and slot count information for the backing node. * Set the Node.driver_data field in ZK to include the backing node and slot information of which backing node the requested node is associated with. * Periodically, delete unused backing nodes. To identify our backing nodes: Our id is: "NodePool:metastatic:{providername}". For every node with our id in user_data: If node locked and first lock contender has our id as identifier: This is one of our nodes The first check is for efficiency, the second is to avoid issues with falsified user_data fields. To cleanup unused backing nodes: For each of our end nodes in use: mark backing node in use. If a backing node hasn't been in use for {grace-time} seconds, set the node to used and remove the lock on the backing node. To identify our end nodes: Check that the node provider matches us, and then consult the driver_data field to find which backing node it's assigned to. This driver acts as both a provider and a user of Nodepool. It provides requested nodes and it uses backing nodes. As a user, it stores node allocation data in the "user_data" field of the backing node. As a provider, it stores node allocation in the "driver_data" field of the requested node. In order to avoid extra write calls to ZK on every allocation (and to avoid race conditions that could cause double accounting errors), the only data we store on the backing node is: that we own it, its label, and the number of slots it supports. On the requested node, we store the backing node id and which slot in the backing node this node occupies. We have an in-memory proxy (BackingNodeRecord) for the backing nodes to keep track of node allocation. When we start up, we initialize the proxy based on the data in ZK. """ class MetastaticInstance(statemachine.Instance): def __init__(self, backing_node, slot, node, metadata=None): super().__init__() if metadata: self.metadata = metadata else: self.metadata = node.driver_data['metadata'] if backing_node: self.interface_ip = backing_node.interface_ip self.public_ipv4 = backing_node.public_ipv4 self.public_ipv6 = backing_node.public_ipv6 self.private_ipv4 = backing_node.private_ipv4 self.az = backing_node.az self.region = backing_node.region # Image overrides: self.username = backing_node.username self.python_path = backing_node.python_path self.shell_type = backing_node.shell_type self.connection_port = backing_node.connection_port self.connection_type = backing_node.connection_type self.host_keys = backing_node.host_keys backing_node_id = backing_node.id else: backing_node_id = None self.driver_data = { 'metadata': self.metadata, 'backing_node': backing_node_id, 'slot': slot, 'node': node.id, } self.external_id = node.id def getQuotaInformation(self): return QuotaInformation(instances=1) class MetastaticResource(statemachine.Resource): def __init__(self, metadata, type, name): super().__init__(metadata) self.type = type self.name = name class MetastaticDeleteStateMachine(statemachine.StateMachine): DEALLOCATING = 'deallocating node' COMPLETE = 'complete' def __init__(self, adapter, external_id): super().__init__() self.adapter = adapter self.node_id = external_id def advance(self): if self.state == self.START: self.adapter._deallocateBackingNode(self.node_id) self.state = self.COMPLETE if self.state == self.COMPLETE: self.complete = True class MetastaticCreateStateMachine(statemachine.StateMachine): REQUESTING = 'requesting backing node' ALLOCATING = 'allocating node' COMPLETE = 'complete' def __init__(self, adapter, hostname, label, image_external_id, metadata, retries): super().__init__() self.adapter = adapter self.retries = retries self.attempts = 0 self.image_external_id = image_external_id self.metadata = metadata self.hostname = hostname self.label = label self.node_id = metadata['nodepool_node_id'] def advance(self): if self.state == self.START: self.backing_node_record, self.slot = \ self.adapter._allocateBackingNode( self.label, self.node_id) if self.backing_node_record.node_id is None: # We need to make a new request self.state = self.REQUESTING else: # We have an existing node self.state = self.COMPLETE self.external_id = self.node_id if self.state == self.REQUESTING: self.adapter._checkBackingNodeRequests() if self.backing_node_record.failed: raise Exception("Backing node failed") if self.backing_node_record.node_id is None: return self.state = self.COMPLETE if self.state == self.COMPLETE: backing_node = self.adapter._getNode( self.backing_node_record.node_id) node = self.adapter._getNode(self.node_id) instance = MetastaticInstance(backing_node, self.slot, node, self.metadata) self.complete = True return instance class BackingNodeRecord: """An in-memory record of backing nodes and what nodes are allocated to them. """ def __init__(self, label_name, slot_count): self.label_name = label_name self.slot_count = slot_count self.node_id = None self.request_id = None self.allocated_nodes = [None for x in range(slot_count)] self.failed = False self.last_used = time.time() def hasAvailableSlot(self): return None in self.allocated_nodes def isEmpty(self): return not any(self.allocated_nodes) def allocateSlot(self, node_id, slot_id=None): if slot_id is None: idx = self.allocated_nodes.index(None) else: idx = slot_id if self.allocated_nodes[idx] is not None: raise Exception("Slot %s of %s is already allocated", idx, self.node_id) self.allocated_nodes[idx] = node_id return idx def deallocateSlot(self, node_id): idx = self.allocated_nodes.index(node_id) self.allocated_nodes[idx] = None self.last_used = time.time() return idx def backsNode(self, node_id): return node_id in self.allocated_nodes class MetastaticAdapter(statemachine.Adapter): log = logging.getLogger("nodepool.driver.metastatic.MetastaticAdapter") def __init__(self, provider_config): self.provider = provider_config self.rate_limiter = RateLimiter(self.provider.name, self.provider.rate) self.backing_node_records = {} # label -> [BackingNodeRecord] self.pending_requests = [] # The requestor id self.my_id = f'NodePool:metastatic:{self.provider.name}' # On startup we need to recover our state from the ZK db, this # flag ensures we only do that once. self.performed_init = False @property def zk(self): return self._provider._zk def getCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, log): return MetastaticCreateStateMachine(self, hostname, label, image_external_id, metadata, retries) def getDeleteStateMachine(self, external_id, log): return MetastaticDeleteStateMachine(self, external_id) def listResources(self): self._init() # Since this is called periodically, this is a good place to # see about deleting unused backing nodes. now = time.time() for label_name, backing_node_records in \ self.backing_node_records.items(): for bnr in backing_node_records[:]: label_config = self.provider._getLabel(bnr.label_name) if label_config: grace_time = label_config.grace_time else: # The label doesn't exist in our config any more, # it must have been removed. grace_time = 0 if (bnr.isEmpty() and now - bnr.last_used > grace_time): self.log.info("Backing node %s has been idle for " "%s seconds, releasing", bnr.node_id, now - bnr.last_used) node = self._getNode(bnr.node_id) node.state = zk.USED self.zk.storeNode(node) self.zk.forceUnlockNode(node) backing_node_records.remove(bnr) return [] def deleteResource(self, resource): self.log.warning("Unhandled request to delete leaked " f"{resource.type}: {resource.name}") # Unused; change log message if we ever use this. def listInstances(self): # We don't need this unless we're managing quota self._init() return [] def getQuotaLimits(self): return QuotaInformation(default=math.inf) def getQuotaForLabel(self, label): return QuotaInformation(instances=1) # Local implementation below def _init(self): if self.performed_init: return self.log.debug("Performing init") # Find backing nodes backing_node_map = {} for node in self.zk.nodeIterator(): try: user_data = json.loads(node.user_data) except Exception: continue if 'owner' not in user_data: continue if user_data['owner'] == self.my_id: # This may be a backing node for us, but double check contenders = self.zk.getNodeLockContenders(node) if contenders and contenders[0] == self.my_id: # We hold the lock on this node backing_node_record = BackingNodeRecord(user_data['label'], user_data['slots']) backing_node_record.node_id = node.id self.log.info("Found backing node %s for %s", node.id, user_data['label']) self._addBackingNode(user_data['label'], backing_node_record) backing_node_map[node.id] = backing_node_record # Assign nodes to backing nodes for node in self.zk.nodeIterator(): if node.provider == self.provider.name: if not node.driver_data: continue bn_id = node.driver_data.get('backing_node') bn_slot = node.driver_data.get('slot') if bn_id and bn_id in backing_node_map: backing_node_record = backing_node_map[bn_id] backing_node_record.allocateSlot(node.id, bn_slot) self.log.info("Found node %s assigned to backing node %s " "slot %s", node.id, backing_node_record.node_id, bn_slot) self.performed_init = True def _setProvider(self, provider): self._provider = provider def _allocateBackingNode(self, label, node_id): self._init() # if we have room for the label, allocate and return existing slot # otherwise, make a new backing node backing_node_record = None for bnr in self.backing_node_records.get(label.name, []): if bnr.hasAvailableSlot(): backing_node_record = bnr break if backing_node_record is None: req = zk.NodeRequest() req.node_types = [label.backing_label] req.state = zk.REQUESTED req.requestor = self.my_id self.zk.storeNodeRequest(req, priority='100') backing_node_record = BackingNodeRecord( label.name, label.max_parallel_jobs) backing_node_record.request_id = req.id self._addBackingNode(label.name, backing_node_record) backing_node_log = (backing_node_record.node_id or f'request {backing_node_record.request_id}') slot = backing_node_record.allocateSlot(node_id) self.log.info("Assigned node %s to backing node %s slot %s", node_id, backing_node_log, slot) return backing_node_record, slot def _addBackingNode(self, label_name, backing_node_record): nodelist = self.backing_node_records.setdefault(label_name, []) nodelist.append(backing_node_record) def _deallocateBackingNode(self, node_id): self._init() for label_name, backing_node_records in \ self.backing_node_records.items(): for bn in backing_node_records: if bn.backsNode(node_id): slot = bn.deallocateSlot(node_id) self.log.info( "Unassigned node %s from backing node %s slot %s", node_id, bn.node_id, slot) return def _checkBackingNodeRequests(self): self._init() waiting_requests = {} for label_name, backing_node_records in \ self.backing_node_records.items(): for bnr in backing_node_records: if bnr.request_id: waiting_requests[bnr.request_id] = bnr if not waiting_requests: return for request in self.zk.nodeRequestIterator(): if request.id not in waiting_requests: continue if request.state == zk.FAILED: self.log.error("Backing request %s failed", request.id) for label_name, records in self.backing_node_records.items(): for bnr in records[:]: if bnr.request_id == request.id: bnr.failed = True records.remove(bnr) if request.state == zk.FULFILLED: bnr = waiting_requests[request.id] node_id = request.nodes[0] self.log.info("Backing request %s fulfilled with node id %s", request.id, node_id) node = self._getNode(node_id) self.zk.lockNode(node, blocking=True, timeout=30, ephemeral=False, identifier=self.my_id) node.user_data = json.dumps({ 'owner': self.my_id, 'label': bnr.label_name, 'slots': bnr.slot_count, }) node.state = zk.IN_USE self.zk.storeNode(node) self.zk.deleteNodeRequest(request) bnr.request_id = None bnr.node_id = node_id def _getNode(self, node_id): return self.zk.getNode(node_id)