455 lines
18 KiB
Python
455 lines
18 KiB
Python
# Copyright 2021 Acme Gating, LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import math
|
|
import logging
|
|
import json
|
|
import time
|
|
|
|
from nodepool.driver.utils import QuotaInformation, RateLimiter
|
|
from nodepool.driver import statemachine
|
|
from nodepool.zk import zookeeper as zk
|
|
|
|
|
|
""" This driver behaves like a static driver execpt that the backing
|
|
nodes come from other Nodepool drivers.
|
|
|
|
The intent is that users will request nodes from this driver, and if
|
|
any nodes already exist, the request will be satisfied with those
|
|
first. If not, then this driver will request a new node from another
|
|
driver in Nodepool, and then add that to this driver's pool of
|
|
available nodes. Each backing node may supply one or more nodes to
|
|
the end user.
|
|
|
|
For example, a user might request 3 nodes from this driver. Having
|
|
none available, this driver would then request a single node from an
|
|
AWS driver. Once that node is available, this driver might configure
|
|
8 terminal nodes all backed by the single AWS node, and fulfill the
|
|
request by allocating 3 of them.
|
|
|
|
If a further request arrived for 5 nodes, they would be fulfilled from
|
|
the remaining 5 slots.
|
|
|
|
Once all 8 nodes have been returned, this driver will release the
|
|
underlying AWS node and the AWS driver will reclaim it.
|
|
|
|
To accomplish this, the process is roughly:
|
|
|
|
* Upon request, if insufficient nodes available, request new backing
|
|
node(s). The requestor should be "NodePool:metastatic:{providername}".
|
|
|
|
* Lock backing nodes with a non-ephemeral lock (so that they persist
|
|
even if this launcher is stopeed) and use
|
|
"NodePool:metastatic:{providername}" as identifier.
|
|
|
|
* Update the Node.user_data field in ZK to include
|
|
"NodePool:metastatic:{providername}" along with label and slot count
|
|
information for the backing node.
|
|
|
|
* Set the Node.driver_data field in ZK to include the backing node and
|
|
slot information of which backing node the requested node is
|
|
associated with.
|
|
|
|
* Periodically, delete unused backing nodes.
|
|
|
|
To identify our backing nodes:
|
|
Our id is: "NodePool:metastatic:{providername}".
|
|
For every node with our id in user_data:
|
|
If node locked and first lock contender has our id as identifier:
|
|
This is one of our nodes
|
|
The first check is for efficiency, the second is to avoid issues with
|
|
falsified user_data fields.
|
|
|
|
To cleanup unused backing nodes:
|
|
For each of our end nodes in use: mark backing node in use.
|
|
|
|
If a backing node hasn't been in use for {grace-time} seconds,
|
|
set the node to used and remove the lock on the backing node.
|
|
|
|
To identify our end nodes:
|
|
Check that the node provider matches us, and then consult the
|
|
driver_data field to find which backing node it's assigned to.
|
|
|
|
|
|
This driver acts as both a provider and a user of Nodepool. It
|
|
provides requested nodes and it uses backing nodes.
|
|
|
|
As a user, it stores node allocation data in the "user_data" field of
|
|
the backing node. As a provider, it stores node allocation in the
|
|
"driver_data" field of the requested node.
|
|
|
|
In order to avoid extra write calls to ZK on every allocation (and to
|
|
avoid race conditions that could cause double accounting errors), the
|
|
only data we store on the backing node is: that we own it, its label,
|
|
and the number of slots it supports. On the requested node, we store
|
|
the backing node id and which slot in the backing node this node
|
|
occupies.
|
|
|
|
We have an in-memory proxy (BackingNodeRecord) for the backing nodes
|
|
to keep track of node allocation. When we start up, we initialize the
|
|
proxy based on the data in ZK.
|
|
"""
|
|
|
|
|
|
class MetastaticInstance(statemachine.Instance):
|
|
def __init__(self, backing_node, slot, node, metadata=None):
|
|
super().__init__()
|
|
if metadata:
|
|
self.metadata = metadata
|
|
else:
|
|
self.metadata = node.driver_data['metadata']
|
|
if backing_node:
|
|
self.interface_ip = backing_node.interface_ip
|
|
self.public_ipv4 = backing_node.public_ipv4
|
|
self.public_ipv6 = backing_node.public_ipv6
|
|
self.private_ipv4 = backing_node.private_ipv4
|
|
self.az = backing_node.az
|
|
self.region = backing_node.region
|
|
# Image overrides:
|
|
self.username = backing_node.username
|
|
self.python_path = backing_node.python_path
|
|
self.shell_type = backing_node.shell_type
|
|
self.connection_port = backing_node.connection_port
|
|
self.connection_type = backing_node.connection_type
|
|
self.host_keys = backing_node.host_keys
|
|
backing_node_id = backing_node.id
|
|
else:
|
|
backing_node_id = None
|
|
self.driver_data = {
|
|
'metadata': self.metadata,
|
|
'backing_node': backing_node_id,
|
|
'slot': slot,
|
|
'node': node.id,
|
|
}
|
|
self.external_id = node.id
|
|
|
|
def getQuotaInformation(self):
|
|
return QuotaInformation(instances=1)
|
|
|
|
|
|
class MetastaticResource(statemachine.Resource):
|
|
def __init__(self, metadata, type, name):
|
|
super().__init__(metadata)
|
|
self.type = type
|
|
self.name = name
|
|
|
|
|
|
class MetastaticDeleteStateMachine(statemachine.StateMachine):
|
|
DEALLOCATING = 'deallocating node'
|
|
COMPLETE = 'complete'
|
|
|
|
def __init__(self, adapter, external_id):
|
|
super().__init__()
|
|
self.adapter = adapter
|
|
self.node_id = external_id
|
|
|
|
def advance(self):
|
|
if self.state == self.START:
|
|
self.adapter._deallocateBackingNode(self.node_id)
|
|
self.state = self.COMPLETE
|
|
|
|
if self.state == self.COMPLETE:
|
|
self.complete = True
|
|
|
|
|
|
class MetastaticCreateStateMachine(statemachine.StateMachine):
|
|
REQUESTING = 'requesting backing node'
|
|
ALLOCATING = 'allocating node'
|
|
COMPLETE = 'complete'
|
|
|
|
def __init__(self, adapter, hostname, label, image_external_id,
|
|
metadata, retries):
|
|
super().__init__()
|
|
self.adapter = adapter
|
|
self.retries = retries
|
|
self.attempts = 0
|
|
self.image_external_id = image_external_id
|
|
self.metadata = metadata
|
|
self.hostname = hostname
|
|
self.label = label
|
|
self.node_id = metadata['nodepool_node_id']
|
|
|
|
def advance(self):
|
|
if self.state == self.START:
|
|
self.backing_node_record, self.slot = \
|
|
self.adapter._allocateBackingNode(
|
|
self.label, self.node_id)
|
|
if self.backing_node_record.node_id is None:
|
|
# We need to make a new request
|
|
self.state = self.REQUESTING
|
|
else:
|
|
# We have an existing node
|
|
self.state = self.COMPLETE
|
|
self.external_id = self.node_id
|
|
|
|
if self.state == self.REQUESTING:
|
|
self.adapter._checkBackingNodeRequests()
|
|
if self.backing_node_record.failed:
|
|
raise Exception("Backing node failed")
|
|
if self.backing_node_record.node_id is None:
|
|
return
|
|
self.state = self.COMPLETE
|
|
|
|
if self.state == self.COMPLETE:
|
|
backing_node = self.adapter._getNode(
|
|
self.backing_node_record.node_id)
|
|
node = self.adapter._getNode(self.node_id)
|
|
instance = MetastaticInstance(backing_node, self.slot,
|
|
node, self.metadata)
|
|
self.complete = True
|
|
return instance
|
|
|
|
|
|
class BackingNodeRecord:
|
|
"""An in-memory record of backing nodes and what nodes are allocated
|
|
to them.
|
|
"""
|
|
def __init__(self, label_name, slot_count):
|
|
self.label_name = label_name
|
|
self.slot_count = slot_count
|
|
self.node_id = None
|
|
self.request_id = None
|
|
self.allocated_nodes = [None for x in range(slot_count)]
|
|
self.failed = False
|
|
self.last_used = time.time()
|
|
|
|
def hasAvailableSlot(self):
|
|
return None in self.allocated_nodes
|
|
|
|
def isEmpty(self):
|
|
return not any(self.allocated_nodes)
|
|
|
|
def allocateSlot(self, node_id, slot_id=None):
|
|
if slot_id is None:
|
|
idx = self.allocated_nodes.index(None)
|
|
else:
|
|
idx = slot_id
|
|
if self.allocated_nodes[idx] is not None:
|
|
raise Exception("Slot %s of %s is already allocated",
|
|
idx, self.node_id)
|
|
self.allocated_nodes[idx] = node_id
|
|
return idx
|
|
|
|
def deallocateSlot(self, node_id):
|
|
idx = self.allocated_nodes.index(node_id)
|
|
self.allocated_nodes[idx] = None
|
|
self.last_used = time.time()
|
|
return idx
|
|
|
|
def backsNode(self, node_id):
|
|
return node_id in self.allocated_nodes
|
|
|
|
|
|
class MetastaticAdapter(statemachine.Adapter):
|
|
log = logging.getLogger("nodepool.driver.metastatic.MetastaticAdapter")
|
|
|
|
def __init__(self, provider_config):
|
|
self.provider = provider_config
|
|
self.rate_limiter = RateLimiter(self.provider.name,
|
|
self.provider.rate)
|
|
self.backing_node_records = {} # label -> [BackingNodeRecord]
|
|
self.pending_requests = []
|
|
# The requestor id
|
|
self.my_id = f'NodePool:metastatic:{self.provider.name}'
|
|
# On startup we need to recover our state from the ZK db, this
|
|
# flag ensures we only do that once.
|
|
self.performed_init = False
|
|
|
|
@property
|
|
def zk(self):
|
|
return self._provider._zk
|
|
|
|
def getCreateStateMachine(self, hostname, label,
|
|
image_external_id, metadata, retries, log):
|
|
return MetastaticCreateStateMachine(self, hostname, label,
|
|
image_external_id, metadata,
|
|
retries)
|
|
|
|
def getDeleteStateMachine(self, external_id, log):
|
|
return MetastaticDeleteStateMachine(self, external_id)
|
|
|
|
def listResources(self):
|
|
self._init()
|
|
# Since this is called periodically, this is a good place to
|
|
# see about deleting unused backing nodes.
|
|
now = time.time()
|
|
for label_name, backing_node_records in \
|
|
self.backing_node_records.items():
|
|
for bnr in backing_node_records[:]:
|
|
label_config = self.provider._getLabel(bnr.label_name)
|
|
if label_config:
|
|
grace_time = label_config.grace_time
|
|
else:
|
|
# The label doesn't exist in our config any more,
|
|
# it must have been removed.
|
|
grace_time = 0
|
|
if (bnr.isEmpty() and
|
|
now - bnr.last_used > grace_time):
|
|
self.log.info("Backing node %s has been idle for "
|
|
"%s seconds, releasing",
|
|
bnr.node_id, now - bnr.last_used)
|
|
node = self._getNode(bnr.node_id)
|
|
node.state = zk.USED
|
|
self.zk.storeNode(node)
|
|
self.zk.forceUnlockNode(node)
|
|
backing_node_records.remove(bnr)
|
|
return []
|
|
|
|
def deleteResource(self, resource):
|
|
self.log.warning("Unhandled request to delete leaked "
|
|
f"{resource.type}: {resource.name}")
|
|
# Unused; change log message if we ever use this.
|
|
|
|
def listInstances(self):
|
|
# We don't need this unless we're managing quota
|
|
self._init()
|
|
return []
|
|
|
|
def getQuotaLimits(self):
|
|
return QuotaInformation(default=math.inf)
|
|
|
|
def getQuotaForLabel(self, label):
|
|
return QuotaInformation(instances=1)
|
|
|
|
# Local implementation below
|
|
|
|
def _init(self):
|
|
if self.performed_init:
|
|
return
|
|
self.log.debug("Performing init")
|
|
# Find backing nodes
|
|
backing_node_map = {}
|
|
for node in self.zk.nodeIterator():
|
|
try:
|
|
user_data = json.loads(node.user_data)
|
|
except Exception:
|
|
continue
|
|
if 'owner' not in user_data:
|
|
continue
|
|
if user_data['owner'] == self.my_id:
|
|
# This may be a backing node for us, but double check
|
|
contenders = self.zk.getNodeLockContenders(node)
|
|
if contenders and contenders[0] == self.my_id:
|
|
# We hold the lock on this node
|
|
backing_node_record = BackingNodeRecord(user_data['label'],
|
|
user_data['slots'])
|
|
backing_node_record.node_id = node.id
|
|
self.log.info("Found backing node %s for %s",
|
|
node.id, user_data['label'])
|
|
self._addBackingNode(user_data['label'],
|
|
backing_node_record)
|
|
backing_node_map[node.id] = backing_node_record
|
|
# Assign nodes to backing nodes
|
|
for node in self.zk.nodeIterator():
|
|
if node.provider == self.provider.name:
|
|
if not node.driver_data:
|
|
continue
|
|
bn_id = node.driver_data.get('backing_node')
|
|
bn_slot = node.driver_data.get('slot')
|
|
if bn_id and bn_id in backing_node_map:
|
|
backing_node_record = backing_node_map[bn_id]
|
|
backing_node_record.allocateSlot(node.id, bn_slot)
|
|
self.log.info("Found node %s assigned to backing node %s "
|
|
"slot %s",
|
|
node.id, backing_node_record.node_id,
|
|
bn_slot)
|
|
self.performed_init = True
|
|
|
|
def _setProvider(self, provider):
|
|
self._provider = provider
|
|
|
|
def _allocateBackingNode(self, label, node_id):
|
|
self._init()
|
|
# if we have room for the label, allocate and return existing slot
|
|
# otherwise, make a new backing node
|
|
backing_node_record = None
|
|
for bnr in self.backing_node_records.get(label.name, []):
|
|
if bnr.hasAvailableSlot():
|
|
backing_node_record = bnr
|
|
break
|
|
if backing_node_record is None:
|
|
req = zk.NodeRequest()
|
|
req.node_types = [label.backing_label]
|
|
req.state = zk.REQUESTED
|
|
req.requestor = self.my_id
|
|
self.zk.storeNodeRequest(req, priority='100')
|
|
backing_node_record = BackingNodeRecord(
|
|
label.name, label.max_parallel_jobs)
|
|
backing_node_record.request_id = req.id
|
|
self._addBackingNode(label.name, backing_node_record)
|
|
backing_node_log = (backing_node_record.node_id or
|
|
f'request {backing_node_record.request_id}')
|
|
slot = backing_node_record.allocateSlot(node_id)
|
|
self.log.info("Assigned node %s to backing node %s slot %s",
|
|
node_id, backing_node_log, slot)
|
|
return backing_node_record, slot
|
|
|
|
def _addBackingNode(self, label_name, backing_node_record):
|
|
nodelist = self.backing_node_records.setdefault(label_name, [])
|
|
nodelist.append(backing_node_record)
|
|
|
|
def _deallocateBackingNode(self, node_id):
|
|
self._init()
|
|
for label_name, backing_node_records in \
|
|
self.backing_node_records.items():
|
|
for bn in backing_node_records:
|
|
if bn.backsNode(node_id):
|
|
slot = bn.deallocateSlot(node_id)
|
|
self.log.info(
|
|
"Unassigned node %s from backing node %s slot %s",
|
|
node_id, bn.node_id, slot)
|
|
return
|
|
|
|
def _checkBackingNodeRequests(self):
|
|
self._init()
|
|
waiting_requests = {}
|
|
for label_name, backing_node_records in \
|
|
self.backing_node_records.items():
|
|
for bnr in backing_node_records:
|
|
if bnr.request_id:
|
|
waiting_requests[bnr.request_id] = bnr
|
|
if not waiting_requests:
|
|
return
|
|
for request in self.zk.nodeRequestIterator():
|
|
if request.id not in waiting_requests:
|
|
continue
|
|
if request.state == zk.FAILED:
|
|
self.log.error("Backing request %s failed", request.id)
|
|
for label_name, records in self.backing_node_records.items():
|
|
for bnr in records[:]:
|
|
if bnr.request_id == request.id:
|
|
bnr.failed = True
|
|
records.remove(bnr)
|
|
if request.state == zk.FULFILLED:
|
|
bnr = waiting_requests[request.id]
|
|
node_id = request.nodes[0]
|
|
self.log.info("Backing request %s fulfilled with node id %s",
|
|
request.id, node_id)
|
|
node = self._getNode(node_id)
|
|
self.zk.lockNode(node, blocking=True, timeout=30,
|
|
ephemeral=False, identifier=self.my_id)
|
|
node.user_data = json.dumps({
|
|
'owner': self.my_id,
|
|
'label': bnr.label_name,
|
|
'slots': bnr.slot_count,
|
|
})
|
|
node.state = zk.IN_USE
|
|
self.zk.storeNode(node)
|
|
self.zk.deleteNodeRequest(request)
|
|
bnr.request_id = None
|
|
bnr.node_id = node_id
|
|
|
|
def _getNode(self, node_id):
|
|
return self.zk.getNode(node_id)
|