Merge "Add metastatic driver"

This commit is contained in:
Zuul 2022-01-25 16:33:33 +00:00 committed by Gerrit Code Review
commit ff8bbdf8f8
12 changed files with 1128 additions and 4 deletions

View File

@ -31,6 +31,7 @@ The following drivers are available.
openshift-pods
openstack
static
metastatic
The following sections are available. All are required unless
otherwise indicated.

129
doc/source/metastatic.rst Normal file
View File

@ -0,0 +1,129 @@
.. _metastatic-driver:
.. default-domain:: zuul
Metastatic Driver
-----------------
This driver uses NodePool nodes (from any driver) as backing nodes to
further allocate "static-like" nodes for end use.
A typical use case is to be able to request a large node (a `backing
node`) from a cloud provider, and then divide that node up into
smaller nodes that are actually used (`requested nodes`). A backing
node can support one or more requested nodes, and backing nodes are
scaled up or down as necessary based on the number of requested
nodes.
The name is derived from the nodes it provides (which are like
"static" nodes) and the fact that the backing nodes come from NodePool
itself, which is "meta".
.. attr-overview::
:prefix: providers.[metastatic]
:maxdepth: 3
.. attr:: providers.[metastatic]
:type: list
A metastatic provider's resources are partitioned into groups
called `pools`, and within a pool, the node types which are to be
made available are listed.
.. note:: For documentation purposes the option names are prefixed
``providers.[metastatic]`` to disambiguate from other
drivers, but ``[metastatic]`` is not required in the
configuration (e.g. below
``providers.[metastatic].pools`` refers to the ``pools``
key in the ``providers`` section when the ``metastatic``
driver is selected).
Example:
.. code-block:: yaml
providers:
- name: meta-provider
driver: metastatic
pools:
- name: main
max-servers: 10
labels:
- name: small-node
backing-label: large-node
max-parallel-jobs: 2
grace-time: 600
.. attr:: name
:required:
A unique name for this provider configuration.
.. attr:: pools
:type: list
A pool defines a group of resources from the provider. Each pool has a
maximum number of nodes which can be launched from it, along with a number
of attributes that characterize the use of the backing nodes.
.. attr:: name
:required:
A unique name within the provider for this pool of resources.
.. attr:: max-servers
:type: int
Maximum number of servers spawnable from this pool. This can
be used to limit the number of servers. If not defined
nodepool can create as many servers that the backing node
providers support.
.. attr:: labels
:type: list
Each entry in a pool's `labels` section indicates that the
corresponding label is available for use in this pool.
.. code-block:: yaml
labels:
- name: small-node
backing-label: large-node
max-parallel-jobs: 2
grace-time: 600
Each entry is a dictionary with the following keys:
.. attr:: name
:type: str
:required:
Identifier for this label.
.. attr:: backing-label
:type: str
:required:
Refers to the name of a different label in Nodepool which
will be used to supply the backing nodes for requests of
this label.
.. attr:: max-parallel-jobs
:type: int
:default: 1
The number of jobs that can run in parallel on a single
backing node.
.. attr:: grace-time
:type: int
:default: 60
When all requested nodes which were assigned to a backing
node have been deleted, the backing node itself is
eligible for deletion. In order to reduce churn,
NodePool will wait a certain amount of time after the
last requested node is deleted to see if new requests
arrive for this label before deleting the backing node.
Set this value to the amount of time in seconds to wait.

View File

@ -58,6 +58,7 @@ def generate_password():
class AzureInstance(statemachine.Instance):
def __init__(self, vm, nic=None, public_ipv4=None,
public_ipv6=None, sku=None):
super().__init__()
self.external_id = vm['name']
self.metadata = vm.get('tags', {})
self.private_ipv4 = None

View File

@ -0,0 +1,36 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nodepool.driver.statemachine import StateMachineDriver
from nodepool.driver.statemachine import StateMachineProvider
from nodepool.driver.metastatic.config import MetastaticProviderConfig
from nodepool.driver.metastatic.adapter import MetastaticAdapter
class MetastaticDriver(StateMachineDriver):
def getProvider(self, provider_config):
# We usually don't override this method, but since our "cloud"
# is actually Nodepool itself, we need to interact with
# nodepool as a client, so we need a ZK connection. We can
# re-use the launcher's connection for this.
adapter = self.getAdapter(provider_config)
provider = StateMachineProvider(adapter, provider_config)
adapter._setProvider(provider)
return provider
def getProviderConfig(self, provider):
return MetastaticProviderConfig(self, provider)
def getAdapter(self, provider_config):
return MetastaticAdapter(provider_config)

View File

@ -0,0 +1,441 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import math
import logging
import json
import time
from nodepool.driver.utils import QuotaInformation, RateLimiter
from nodepool.driver import statemachine
from nodepool import zk
""" This driver behaves like a static driver execpt that the backing
nodes come from other Nodepool drivers.
The intent is that users will request nodes from this driver, and if
any nodes already exist, the request will be satisfied with those
first. If not, then this driver will request a new node from another
driver in Nodepool, and then add that to this driver's pool of
available nodes. Each backing node may supply one or more nodes to
the end user.
For example, a user might request 3 nodes from this driver. Having
none available, this driver would then request a single node from an
AWS driver. Once that node is available, this driver might configure
8 terminal nodes all backed by the single AWS node, and fulfill the
request by allocating 3 of them.
If a further request arrived for 5 nodes, they would be fulfilled from
the remaining 5 slots.
Once all 8 nodes have been returned, this driver will release the
underlying AWS node and the AWS driver will reclaim it.
To accomplish this, the process is roughly:
* Upon request, if insufficient nodes available, request new backing
node(s). The requestor should be "NodePool:metastatic:{providername}".
* Lock backing nodes with a non-ephemeral lock (so that they persist
even if this launcher is stopeed) and use
"NodePool:metastatic:{providername}" as identifier.
* Update the Node.user_data field in ZK to include
"NodePool:metastatic:{providername}" along with label and slot count
information for the backing node.
* Set the Node.driver_data field in ZK to include the backing node and
slot information of which backing node the requested node is
associated with.
* Periodically, delete unused backing nodes.
To identify our backing nodes:
Our id is: "NodePool:metastatic:{providername}".
For every node with our id in user_data:
If node locked and first lock contender has our id as identifier:
This is one of our nodes
The first check is for efficiency, the second is to avoid issues with
falsified user_data fields.
To cleanup unused backing nodes:
For each of our end nodes in use: mark backing node in use.
If a backing node hasn't been in use for {grace-time} seconds,
set the node to used and remove the lock on the backing node.
To identify our end nodes:
Check that the node provider matches us, and then consult the
driver_data field to find which backing node it's assigned to.
This driver acts as both a provider and a user of Nodepool. It
provides requested nodes and it uses backing nodes.
As a user, it stores node allocation data in the "user_data" field of
the backing node. As a provider, it stores node allocation in the
"driver_data" field of the requested node.
In order to avoid extra write calls to ZK on every allocation (and to
avoid race conditions that could cause double accounting errors), the
only data we store on the backing node is: that we own it, its label,
and the number of slots it supports. On the requested node, we store
the backing node id and which slot in the backing node this node
occupies.
We have an in-memory proxy (BackingNodeRecord) for the backing nodes
to keep track of node allocation. When we start up, we initialize the
proxy based on the data in ZK.
"""
class MetastaticInstance(statemachine.Instance):
def __init__(self, backing_node, slot, node, metadata=None):
super().__init__()
if metadata:
self.metadata = metadata
else:
self.metadata = node.driver_data['metadata']
if backing_node:
self.interface_ip = backing_node.interface_ip
self.public_ipv4 = backing_node.public_ipv4
self.public_ipv6 = backing_node.public_ipv6
self.private_ipv4 = backing_node.private_ipv4
self.az = backing_node.az
self.region = backing_node.region
# Image overrides:
self.username = backing_node.username
self.python_path = backing_node.python_path
self.shell_type = backing_node.shell_type
self.connection_port = backing_node.connection_port
self.connection_type = backing_node.connection_type
backing_node_id = backing_node.id
else:
backing_node_id = None
self.driver_data = {
'metadata': self.metadata,
'backing_node': backing_node_id,
'slot': slot,
'node': node.id,
}
self.external_id = node.id
def getQuotaInformation(self):
return QuotaInformation(instances=1)
class MetastaticResource(statemachine.Resource):
def __init__(self, metadata, type, name):
super().__init__(metadata)
self.type = type
self.name = name
class MetastaticDeleteStateMachine(statemachine.StateMachine):
DEALLOCATING = 'deallocating node'
COMPLETE = 'complete'
def __init__(self, adapter, external_id):
super().__init__()
self.adapter = adapter
self.node_id = external_id
def advance(self):
if self.state == self.START:
self.adapter._deallocateBackingNode(self.node_id)
self.state = self.COMPLETE
if self.state == self.COMPLETE:
self.complete = True
class MetastaticCreateStateMachine(statemachine.StateMachine):
REQUESTING = 'requesting backing node'
ALLOCATING = 'allocating node'
COMPLETE = 'complete'
def __init__(self, adapter, hostname, label, image_external_id,
metadata, retries):
super().__init__()
self.adapter = adapter
self.retries = retries
self.attempts = 0
self.image_external_id = image_external_id
self.metadata = metadata
self.hostname = hostname
self.label = label
self.node_id = metadata['nodepool_node_id']
def advance(self):
if self.state == self.START:
self.backing_node_record, self.slot = \
self.adapter._allocateBackingNode(
self.label, self.node_id)
if self.backing_node_record.node_id is None:
# We need to make a new request
self.state = self.REQUESTING
else:
# We have an existing node
self.state = self.COMPLETE
self.external_id = self.node_id
if self.state == self.REQUESTING:
self.adapter._checkBackingNodeRequests()
if self.backing_node_record.failed:
raise Exception("Backing node failed")
if self.backing_node_record.node_id is None:
return
self.state = self.COMPLETE
if self.state == self.COMPLETE:
backing_node = self.adapter._getNode(
self.backing_node_record.node_id)
node = self.adapter._getNode(self.node_id)
instance = MetastaticInstance(backing_node, self.slot,
node, self.metadata)
self.complete = True
return instance
class BackingNodeRecord:
"""An in-memory record of backing nodes and what nodes are allocated
to them.
"""
def __init__(self, label_name, slot_count):
self.label_name = label_name
self.slot_count = slot_count
self.node_id = None
self.request_id = None
self.allocated_nodes = [None for x in range(slot_count)]
self.failed = False
self.last_used = time.time()
def hasAvailableSlot(self):
return None in self.allocated_nodes
def isEmpty(self):
return not any(self.allocated_nodes)
def allocateSlot(self, node_id, slot_id=None):
if slot_id is None:
idx = self.allocated_nodes.index(None)
else:
idx = slot_id
if self.allocated_nodes[idx] is not None:
raise Exception("Slot %s of %s is already allocated",
idx, self.node_id)
self.allocated_nodes[idx] = node_id
return idx
def deallocateSlot(self, node_id):
idx = self.allocated_nodes.index(node_id)
self.allocated_nodes[idx] = None
self.last_used = time.time()
def backsNode(self, node_id):
return node_id in self.allocated_nodes
class MetastaticAdapter(statemachine.Adapter):
log = logging.getLogger("nodepool.driver.metastatic.MetastaticAdapter")
def __init__(self, provider_config):
self.provider = provider_config
self.rate_limiter = RateLimiter(self.provider.name,
self.provider.rate)
self.backing_node_records = {} # label -> [BackingNodeRecord]
self.pending_requests = []
# The requestor id
self.my_id = f'NodePool:metastatic:{self.provider.name}'
# On startup we need to recover our state from the ZK db, this
# flag ensures we only do that once.
self.performed_init = False
@property
def zk(self):
return self._provider._zk
def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries):
return MetastaticCreateStateMachine(self, hostname, label,
image_external_id, metadata,
retries)
def getDeleteStateMachine(self, external_id):
return MetastaticDeleteStateMachine(self, external_id)
def listResources(self):
self._init()
# Since this is called periodically, this is a good place to
# see about deleting unused backing nodes.
now = time.time()
for label_name, backing_node_records in \
self.backing_node_records.items():
for bnr in backing_node_records[:]:
label_config = self.provider._getLabel(bnr.label_name)
if (bnr.isEmpty() and
now - bnr.last_used > label_config.grace_time):
self.log.info("Backing node %s has been idle for "
"%s seconds, releasing",
bnr.node_id, now - bnr.last_used)
node = self._getNode(bnr.node_id)
node.state = zk.USED
self.zk.storeNode(node)
self.zk.forceUnlockNode(node)
backing_node_records.remove(bnr)
return []
def deleteResource(self, resource):
self.log.warning("Unhandled request to delete leaked "
f"{resource.type}: {resource.name}")
# Unused; change log message if we ever use this.
def listInstances(self):
# We don't need this unless we're managing quota
self._init()
return []
def getQuotaLimits(self):
return QuotaInformation(default=math.inf)
def getQuotaForLabel(self, label):
return QuotaInformation(instances=1)
# Local implementation below
def _init(self):
if self.performed_init:
return
self.log.debug("Performing init")
# Find backing nodes
backing_node_map = {}
for node in self.zk.nodeIterator():
try:
user_data = json.loads(node.user_data)
except Exception:
continue
if 'owner' not in user_data:
continue
if user_data['owner'] == self.my_id:
# This may be a backing node for us, but double check
contenders = self.zk.getNodeLockContenders(node)
if contenders and contenders[0] == self.my_id:
# We hold the lock on this node
backing_node_record = BackingNodeRecord(user_data['label'],
user_data['slots'])
backing_node_record.node_id = node.id
self.log.info("Found backing node %s for %s",
node.id, user_data['label'])
self._addBackingNode(user_data['label'],
backing_node_record)
backing_node_map[node.id] = backing_node_record
# Assign nodes to backing nodes
for node in self.zk.nodeIterator():
if node.provider == self.provider.name:
if not node.driver_data:
continue
bn_id = node.driver_data.get('backing_node')
bn_slot = node.driver_data.get('slot')
if bn_id and bn_id in backing_node_map:
backing_node_record = backing_node_map[bn_id]
backing_node_record.allocateSlot(node.id, bn_slot)
self.log.info("Found node %s assigned to backing node %s "
"slot %s",
node.id, backing_node_record.node_id,
bn_slot)
self.performed_init = True
def _setProvider(self, provider):
self._provider = provider
def _allocateBackingNode(self, label, node_id):
self._init()
# if we have room for the label, allocate and return existing slot
# otherwise, make a new backing node
backing_node_record = None
for bnr in self.backing_node_records.get(label.name, []):
if bnr.hasAvailableSlot():
backing_node_record = bnr
break
if backing_node_record is None:
req = zk.NodeRequest()
req.node_types = [label.backing_label]
req.state = zk.REQUESTED
req.requestor = self.my_id
self.zk.storeNodeRequest(req, priority='100')
backing_node_record = BackingNodeRecord(
label.name, label.max_parallel_jobs)
backing_node_record.request_id = req.id
self._addBackingNode(label.name, backing_node_record)
slot = backing_node_record.allocateSlot(node_id)
self.log.info("Assigned node %s to backing node %s slot %s",
node_id, backing_node_record.node_id, slot)
return backing_node_record, slot
def _addBackingNode(self, label_name, backing_node_record):
nodelist = self.backing_node_records.setdefault(label_name, [])
nodelist.append(backing_node_record)
def _deallocateBackingNode(self, node_id):
self._init()
for label_name, backing_node_records in \
self.backing_node_records.items():
for bn in backing_node_records:
if bn.backsNode(node_id):
bn.deallocateSlot(node_id)
return
def _checkBackingNodeRequests(self):
self._init()
waiting_requests = {}
for label_name, backing_node_records in \
self.backing_node_records.items():
for bnr in backing_node_records:
if bnr.request_id:
waiting_requests[bnr.request_id] = bnr
if not waiting_requests:
return
for request in self.zk.nodeRequestIterator():
if request.id not in waiting_requests:
continue
if request.state == zk.FAILED:
self.log.error("Backing request %s failed", request.id)
for label_name, records in self.backing_node_records.items():
for bnr in records[:]:
if bnr.request_id == request.id:
bnr.failed = True
records.remove(bnr)
if request.state == zk.FULFILLED:
bnr = waiting_requests[request.id]
node_id = request.nodes[0]
self.log.info("Backing request %s fulfilled with node id %s",
request.id, node_id)
node = self._getNode(node_id)
self.zk.lockNode(node, blocking=True, timeout=30,
ephemeral=False, identifier=self.my_id)
node.user_data = json.dumps({
'owner': self.my_id,
'label': bnr.label_name,
'slots': bnr.slot_count,
})
node.state = zk.IN_USE
self.zk.storeNode(node)
self.zk.deleteNodeRequest(request)
bnr.request_id = None
bnr.node_id = node_id
def _getNode(self, node_id):
return self.zk.getNode(node_id)

View File

@ -0,0 +1,158 @@
# Copyright 2018 Red Hat
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import voluptuous as v
from nodepool.driver import ConfigPool
from nodepool.driver import ConfigValue
from nodepool.driver import ProviderConfig
class MetastaticCloudImage(ConfigValue):
def __init__(self):
self.name = 'unused'
self.username = 'unknown'
self.python_path = 'unknown'
self.shell_type = 'unknown'
self.connection_port = 'unknown'
self.connection_type = 'unknown'
class MetastaticLabel(ConfigValue):
ignore_equality = ['pool']
def __init__(self, label, provider_config, provider_pool):
self.pool = provider_pool
self.name = label['name']
self.backing_label = label['backing-label']
self.diskimage = None
self.cloud_image = MetastaticCloudImage()
self.max_parallel_jobs = label.get('max-parallel-jobs', 1)
self.grace_time = label.get('grace-time', 60)
@staticmethod
def getSchema():
return {
v.Required('name'): str,
v.Required('backing-label'): str,
'max-parallel-jobs': int,
'grace-time': int,
}
def isBackingConfigEqual(self, other):
# An equality check of the backing configuration
return (
self.backing_label == other.backing_label and
self.max_parallel_jobs == other.max_parallel_jobs and
self.grace_time == other.grace_time
)
class MetastaticPool(ConfigPool):
ignore_equality = ['provider']
def __init__(self, provider_config, pool_config):
super().__init__()
self.provider = provider_config
self.labels = {}
# We will just use the interface_ip of the backing node
self.use_internal_ip = False
self.load(pool_config)
def load(self, pool_config):
self.name = pool_config['name']
self.max_servers = pool_config.get('max-servers', math.inf)
for label in pool_config.get('labels', []):
b = MetastaticLabel(label, self.provider, self)
self.labels[b.name] = b
@staticmethod
def getSchema():
label = MetastaticLabel.getSchema()
pool = ConfigPool.getCommonSchemaDict()
pool.update({
v.Required('name'): str,
v.Required('labels'): [label],
'max-servers': int,
})
return pool
class MetastaticProviderConfig(ProviderConfig):
def __init__(self, driver, provider):
super().__init__(provider)
self._pools = {}
self.rate = None
self.launch_retries = None
@property
def pools(self):
return self._pools
@property
def manage_images(self):
return False
@staticmethod
def reset():
pass
def load(self, config):
self.rate = self.provider.get('rate', 1)
self.launch_retries = self.provider.get('launch-retries', 3)
self.launch_timeout = self.provider.get('launch-timeout', 3600)
self.boot_timeout = self.provider.get('boot-timeout', 120)
label_defs = {}
for pool in self.provider.get('pools', []):
pp = MetastaticPool(self, pool)
self._pools[pp.name] = pp
for label in pool.get('labels', []):
pl = MetastaticLabel(label, self, pp)
if pl.backing_label in label_defs:
if not pl.isBackingConfigEqual(
label_defs[pl.backing_label]):
raise Exception(
"Multiple label definitions for the same "
"backing label must be identical")
label_defs[pl.backing_label] = pl
config.labels[pl.name].pools.append(pp)
def getSchema(self):
pool = MetastaticPool.getSchema()
provider = ProviderConfig.getCommonSchemaDict()
provider.update({
v.Required('pools'): [pool],
})
return v.Schema(provider)
def getSupportedLabels(self, pool_name=None):
labels = set()
for pool in self._pools.values():
if not pool_name or (pool.name == pool_name):
labels.update(pool.labels.keys())
return labels
def _getLabel(self, label):
for pool in self._pools.values():
if label in pool.labels:
return pool.labels[label]

View File

@ -151,6 +151,15 @@ class StateMachineNodeLauncher(stats.StatsReporter):
node.public_ipv6 = instance.public_ipv6
node.region = instance.region
node.az = instance.az
node.driver_data = instance.driver_data
# Optionally, if the node has updated values that we set from
# the image attributes earlier, set those.
for attr in ('username', 'python_path', 'shell_type',
'connection_port', 'connection_type'):
if hasattr(instance, attr):
setattr(node, attr, getattr(instance, attr))
self.zk.storeNode(node)
def runStateMachine(self):
@ -698,6 +707,16 @@ class Instance:
* private_ipv4: str
* az: str
* region: str
* driver_data: any
And the following are even more optional (as they are usually
already set from the image configuration):
* username: str
* python_path: str
* shell_type: str
* connection_port: str
* connection_type: str
"""
def __init__(self):
self.ready = False
@ -710,6 +729,7 @@ class Instance:
self.az = None
self.region = None
self.metadata = {}
self.driver_data = None
def __repr__(self):
state = []

View File

@ -0,0 +1,40 @@
labels:
- name: backing-label
min-ready: 0
- name: user-label
min-ready: 0
- name: bad-label
min-ready: 0
providers:
# The backing node provider: a cloud
- name: fake-provider
cloud: fake
driver: fake
region-name: fake-region
rate: 0.0001
cloud-images:
- name: fake-image
pools:
- name: main
max-servers: 96
labels:
- name: backing-label
cloud-image: fake-image
min-ram: 8192
flavor-name: 'Fake'
- name: meta-provider
driver: metastatic
pools:
- name: main
max-servers: 10
labels:
- name: user-label
backing-label: backing-label
max-parallel-jobs: 2
grace-time: 2
- name: bad-label
backing-label: backing-label
max-parallel-jobs: 4 # We can't have different numbers of max-parallel-jobs
grace-time: 2

View File

@ -0,0 +1,40 @@
labels:
- name: backing-label
min-ready: 0
- name: user-label
min-ready: 0
- name: bad-label
min-ready: 0
providers:
# The backing node provider: a cloud
- name: fake-provider
cloud: fake
driver: fake
region-name: fake-region
rate: 0.0001
cloud-images:
- name: fake-image
pools:
- name: main
max-servers: 96
labels:
- name: backing-label
cloud-image: fake-image
min-ram: 8192
flavor-name: 'Fake'
- name: meta-provider
driver: metastatic
pools:
- name: main
max-servers: 10
labels:
- name: user-label
backing-label: backing-label
max-parallel-jobs: 2
grace-time: 2
- name: bad-label
backing-label: backing-label
max-parallel-jobs: 2 # These are identical, so it's okay
grace-time: 2

48
nodepool/tests/fixtures/metastatic.yaml vendored Normal file
View File

@ -0,0 +1,48 @@
webapp:
port: 8005
listen_address: '0.0.0.0'
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
labels:
- name: backing-label
min-ready: 0
- name: user-label
min-ready: 0
providers:
# The backing node provider: a cloud
- name: fake-provider
cloud: fake
driver: fake
region-name: fake-region
rate: 0.0001
cloud-images:
- name: fake-image
pools:
- name: main
max-servers: 96
labels:
- name: backing-label
cloud-image: fake-image
min-ram: 8192
flavor-name: 'Fake'
- name: meta-provider
driver: metastatic
pools:
- name: main
max-servers: 10
labels:
- name: user-label
backing-label: backing-label
max-parallel-jobs: 2
grace-time: 2

View File

@ -0,0 +1,181 @@
# Copyright (C) 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import testtools
from nodepool import tests
from nodepool import zk
from nodepool.driver.statemachine import StateMachineProvider
from nodepool.cmd.config_validator import ConfigValidator
class TestDriverMetastatic(tests.DBTestCase):
log = logging.getLogger("nodepool.TestDriverMetastatic")
def setUp(self):
super().setUp()
StateMachineProvider.MINIMUM_SLEEP = 0.1
StateMachineProvider.MAXIMUM_SLEEP = 1
def _requestNode(self):
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('user-label')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.provider, 'meta-provider')
return node
def _getNodes(self):
nodes = [n for n in self.zk.nodeIterator()]
nodes = sorted(nodes, key=lambda n: n.id)
self.log.debug("Nodes:")
for n in nodes:
self.log.debug(' %s %s', n.id, n.provider)
return nodes
def test_metastatic_validator(self):
# Test schema validation
config = os.path.join(os.path.dirname(tests.__file__),
'fixtures', 'config_validate',
'metastatic_ok.yaml')
validator = ConfigValidator(config)
ret = validator.validate()
self.assertEqual(ret, 0)
# Test runtime value assertions
configfile = self.setup_config('config_validate/metastatic_error.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
with testtools.ExpectedException(Exception, 'Multiple label def'):
pool.loadConfig()
configfile = self.setup_config('config_validate/metastatic_ok.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.loadConfig()
def test_metastatic(self):
configfile = self.setup_config('metastatic.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="fake-image")
# Request a node, verify that there is a backing node, and it
# has the same connection info
node1 = self._requestNode()
nodes = self._getNodes()
self.assertEqual(len(nodes), 2)
self.assertEqual(nodes[0], node1)
self.assertNotEqual(nodes[1], node1)
bn1 = nodes[1]
self.assertEqual(bn1.provider, 'fake-provider')
self.assertEqual(bn1.interface_ip, node1.interface_ip)
self.assertEqual(bn1.python_path, node1.python_path)
self.assertEqual('auto', node1.python_path)
self.assertEqual(bn1.shell_type, node1.shell_type)
self.assertEqual(None, node1.shell_type)
self.assertEqual(bn1.host_keys, node1.host_keys)
self.assertEqual(['ssh-rsa FAKEKEY'], node1.host_keys)
self.assertEqual(bn1.id, node1.driver_data['backing_node'])
# Allocate a second node, should have same backing node
node2 = self._requestNode()
nodes = self._getNodes()
self.assertEqual(nodes, [node1, bn1, node2])
self.assertEqual(bn1.id, node2.driver_data['backing_node'])
# Allocate a third node, should have a second backing node
node3 = self._requestNode()
nodes = self._getNodes()
self.assertNotEqual(nodes[4], node1)
self.assertNotEqual(nodes[4], node2)
self.assertNotEqual(nodes[4], node3)
bn2 = nodes[4]
self.assertEqual(nodes, [node1, bn1, node2, node3, bn2])
self.assertEqual(bn2.id, node3.driver_data['backing_node'])
self.assertNotEqual(bn1.id, bn2.id)
# Delete node #2, verify that both backing nodes exist
node2.state = zk.DELETING
self.zk.storeNode(node2)
self.waitForNodeDeletion(node2)
# Allocate a replacement, verify it occupies slot 2
node4 = self._requestNode()
nodes = self._getNodes()
self.assertEqual(nodes, [node1, bn1, node3, bn2, node4])
self.assertEqual(bn1.id, node4.driver_data['backing_node'])
# Delete #4 and #1. verify backing node 1 is removed
node4.state = zk.DELETING
self.zk.storeNode(node4)
node1.state = zk.DELETING
self.zk.storeNode(node1)
self.waitForNodeDeletion(node4)
self.waitForNodeDeletion(node1)
self.waitForNodeDeletion(bn1)
nodes = self._getNodes()
self.assertEqual(nodes, [node3, bn2])
def test_metastatic_startup(self):
configfile = self.setup_config('metastatic.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="fake-image")
# Request a node, verify that there is a backing node, and it
# has the same connection info
node1 = self._requestNode()
nodes = self._getNodes()
bn1 = nodes[1]
self.assertEqual(nodes, [node1, bn1])
# Restart the provider and make sure we load data correctly
pool.stop()
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="fake-image")
# Allocate a second node, should have same backing node
node2 = self._requestNode()
nodes = self._getNodes()
self.assertEqual(nodes, [node1, bn1, node2])
self.assertEqual(bn1.id, node2.driver_data['backing_node'])
# Allocate a third node, should have a second backing node
node3 = self._requestNode()
nodes = self._getNodes()
bn2 = nodes[4]
self.assertEqual(nodes, [node1, bn1, node2, node3, bn2])
self.assertEqual(bn2.id, node3.driver_data['backing_node'])
self.assertNotEqual(bn1.id, bn2.id)

View File

@ -594,6 +594,7 @@ class Node(BaseModel):
self.attributes = None
self.python_path = None
self.tenant_name = None
self.driver_data = None
def __repr__(self):
d = self.toDict()
@ -635,7 +636,8 @@ class Node(BaseModel):
self.resources == other.resources and
self.attributes == other.attributes and
self.python_path == other.python_path and
self.tenant_name == other.tenant_name)
self.tenant_name == other.tenant_name and
self.driver_data == other.driver_data)
else:
return False
@ -688,6 +690,7 @@ class Node(BaseModel):
d['attributes'] = self.attributes
d['python_path'] = self.python_path
d['tenant_name'] = self.tenant_name
d['driver_data'] = self.driver_data
return d
@staticmethod
@ -755,6 +758,7 @@ class Node(BaseModel):
self.python_path = d.get('python_path')
self.shell_type = d.get('shell_type')
self.tenant_name = d.get('tenant_name')
self.driver_data = d.get('driver_data')
class ZooKeeper(object):
@ -1943,7 +1947,8 @@ class ZooKeeper(object):
request.lock.release()
request.lock = None
def lockNode(self, node, blocking=True, timeout=None):
def lockNode(self, node, blocking=True, timeout=None,
ephemeral=True, identifier=None):
'''
Lock a node.
@ -1957,6 +1962,10 @@ class ZooKeeper(object):
acquire the lock
:param int timeout: When blocking, how long to wait for the lock
to get acquired. None, the default, waits forever.
:param bool ephemeral: Whether to use an ephemeral lock. Unless
you have a really good reason, use the default of True.
:param bool identifier: Identifies the lock holder. The default
of None is usually fine.
:raises: TimeoutException if we failed to acquire the lock when
blocking with a timeout. ZKLockException if we are not blocking
@ -1964,8 +1973,8 @@ class ZooKeeper(object):
'''
path = self._nodeLockPath(node.id)
try:
lock = Lock(self.client, path)
have_lock = lock.acquire(blocking, timeout)
lock = Lock(self.client, path, identifier)
have_lock = lock.acquire(blocking, timeout, ephemeral)
except kze.LockTimeout:
raise npe.TimeoutException(
"Timeout trying to acquire lock %s" % path)
@ -1998,6 +2007,26 @@ class ZooKeeper(object):
node.lock.release()
node.lock = None
def forceUnlockNode(self, node):
'''
Forcibly unlock a node.
:param Node node: The node to unlock.
'''
path = self._nodeLockPath(node.id)
try:
self.client.delete(path, recursive=True)
except kze.NoNodeError:
pass
def getNodeLockContenders(self, node):
'''
Return the contenders for a node lock.
'''
path = self._nodeLockPath(node.id)
lock = Lock(self.client, path)
return lock.contenders()
def getNodes(self):
'''
Get the current list of all nodes.