Browse Source

Merge "Add a state machine driver framework"

Zuul 1 month ago
committed by Gerrit Code Review
  1. 35
  2. 564
  3. 34


@ -203,3 +203,38 @@ See the ``gce`` provider for an example.
.. autoclass:: nodepool.driver.simple.SimpleTaskManagerDriver
State Machine Drivers
.. note:: This system is still in development and lacks robust support
for quotas or image building.
To use this system, you will need to implement a few subclasses.
First, create a :ref:`provider_config` subclass as you would for any
Then, subclass :py:class:`~nodepool.driver.statemachine.Instance` to
map remote instance data into a format the driver can understand.
Next, create two subclasses of
:py:class:`~nodepool.driver.statemachine.StateMachine` to
implement creating and deleting instances.
Subclass :py:class:`~nodepool.driver.statemachine.Adapter` to
implement the main methods that interact with the cloud.
Finally, subclass
:py:class:`~nodepool.driver.statemachine.StateMachineDriver` to tie
them all together.
See the ``example`` provider for an example.
.. autoclass:: nodepool.driver.statemachine.Instance
.. autoclass:: nodepool.driver.statemachine.StateMachine
.. autoclass:: nodepool.driver.statemachine.Adapter
.. autoclass:: nodepool.driver.statemachine.StateMachineDriver


@ -0,0 +1,564 @@
# Copyright 2019 Red Hat
# Copyright 2021 Acme Gating, LLC
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import logging
import math
from nodepool.driver import Driver, NodeRequestHandler, Provider
from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.nodeutils import iterate_timeout, nodescan
from nodepool import exceptions
from nodepool import zk
from kazoo import exceptions as kze
class StateMachineHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.simple."
def __init__(self, pw, request):
super().__init__(pw, request)
self.state_machines = []
def alive_thread_count(self):
return len([sm for sm in self.state_machines if sm[1].complete])
def imagesAvailable(self):
Determines if the requested images are available for this provider.
:returns: True if it is available, False otherwise.
return True
def hasProviderQuota(self, node_types):
Checks if a provider has enough quota to handle a list of nodes.
This does not take our currently existing nodes into account.
:param node_types: list of node types to check
:return: True if the node list fits into the provider, False otherwise
needed_quota = QuotaInformation()
for ntype in node_types:
self.manager.quotaNeededByLabel(ntype, self.pool))
if hasattr(self.pool, 'ignore_provider_quota'):
if not self.pool.ignore_provider_quota:
cloud_quota = self.manager.estimatedNodepoolQuota()
if not cloud_quota.non_negative():
return False
# Now calculate pool specific quota. Values indicating no quota default
# to math.inf representing infinity that can be calculated with.
pool_quota = QuotaInformation(
cores=getattr(self.pool, 'max_cores', None),
ram=getattr(self.pool, 'max_ram', None),
return pool_quota.non_negative()
def hasRemainingQuota(self, ntype):
Checks if the predicted quota is enough for an additional node of type
:param ntype: node type for the quota check
:return: True if there is enough quota, False otherwise
needed_quota = self.manager.quotaNeededByLabel(ntype, self.pool)
# Calculate remaining quota which is calculated as:
# quota = <total nodepool quota> - <used quota> - <quota for node>
cloud_quota = self.manager.estimatedNodepoolQuota()
self.log.debug("Predicted remaining provider quota: %s",
if not cloud_quota.non_negative():
return False
# Now calculate pool specific quota. Values indicating no quota default
# to math.inf representing infinity that can be calculated with.
pool_quota = QuotaInformation(
cores=getattr(self.pool, 'max_cores', None),
ram=getattr(self.pool, 'max_ram', None),
self.log.debug("Current pool quota: %s" % pool_quota)
self.log.debug("Predicted remaining pool quota: %s", pool_quota)
return pool_quota.non_negative()
def _gatherHostKeys(self, node):
keys = []
if self.pool.host_key_checking:
if (node.connection_type == 'ssh' or
node.connection_type == 'network_cli'):
gather_hostkeys = True
gather_hostkeys = False
keys = nodescan(node.interface_ip, port=node.connection_port,
timeout=180, gather_hostkeys=gather_hostkeys)
except Exception:
raise exceptions.LaunchKeyscanException(
"Can't scan instance %s key" %
node.host_keys = keys
def _updateNodeFromInstance(self, node, instance, gather_keys=False):
if instance is None:
label = self.pool.labels[node.type[0]]
if self.pool.use_internal_ip and instance.private_ipv4:
server_ip = instance.private_ipv4
server_ip = instance.interface_ip
node.external_id = instance.external_id
node.interface_ip = server_ip
node.public_ipv4 = instance.public_ipv4
node.private_ipv4 = instance.private_ipv4
node.public_ipv6 = instance.public_ipv6
node.region = instance.region =
node.username = label.cloud_image.username
node.python_path = label.cloud_image.python_path
node.connection_port = label.cloud_image.connection_port
node.connection_type = label.cloud_image.connection_type
if gather_keys:
def _runStateMachine(self, node, state_machine):
if state_machine.complete:
return True
instance = None
now = time.monotonic()
if (now - state_machine.start_time >
raise Exception("Timeout waiting for instance creation")
instance = state_machine.advance()
self.log.debug(f"State machine for {} at "
if not node.external_id:
if not state_machine.external_id:
raise Exception("Driver implementation error: state "
"machine must produce external ID "
"after first advancement")
node.external_id = state_machine.external_id
if state_machine.complete:
self.log.debug(f"Node {} is ready")
node.state = zk.READY
self._updateNodeFromInstance(node, instance, gather_keys=True)
except kze.SessionExpiredError:
# Our node lock is gone, leaving the node state as BUILDING.
# This will get cleaned up in ZooKeeper automatically, but we
# must still set our cached node state to FAILED for the
# NodeLaunchManager's poll() method.
"Lost ZooKeeper session trying to launch for node %s",
node.state = zk.FAILED
node.external_id = state_machine.external_id
# TODO: stats
# statsd_key = 'error.zksession'
except exceptions.QuotaException:"Aborting node %s due to quota failure" %
node.state = zk.ABORTED
node.external_id = state_machine.external_id
# TODO: stats
# statsd_key = 'error.quota'
except Exception:
"Launch failed for node %s:",
node.state = zk.FAILED
node.external_id = state_machine.external_id
# TODO: stats
# if hasattr(e, 'statsd_key'):
# statsd_key = e.statsd_key
# else:
# statsd_key = 'error.unknown'
if node.state == zk.FAILED:
state_machine.complete = True
if state_machine.complete:
return True
def launchesComplete(self):
Check if all launch requests have completed.
When all of the Node objects have reached a final state (READY, FAILED
or ABORTED), we'll know all threads have finished the launch process.
all_complete = True
for (node, state_machine) in self.state_machines:
if not self._runStateMachine(node, state_machine):
all_complete = False
return all_complete
def launch(self, node):
label = self.pool.labels[node.type[0]]
hostname = 'nodepool-' +
retries = self.manager.provider.launch_retries
metadata = {'nodepool_node_id':,
state_machine = self.manager.adapter.getCreateStateMachine(
hostname, label, metadata, retries)
self.state_machines.append((node, state_machine))
class StateMachineProvider(Provider, QuotaSupport):
"""The Provider implementation for the StateMachineManager driver
log = logging.getLogger("nodepool.driver.statemachine."
def __init__(self, adapter, provider):
self.provider = provider
self.adapter = adapter
self.delete_state_machines = {}
self._zk = None
def start(self, zk_conn):
self._zk = zk_conn
def getRequestHandler(self, poolworker, request):
return StateMachineHandler(poolworker, request)
def labelReady(self, label):
return True
def getProviderLimits(self):
return self.adapter.getQuotaLimits()
except NotImplementedError:
return QuotaInformation(
def quotaNeededByLabel(self, ntype, pool):
provider_label = pool.labels[ntype]
return self.adapter.getQuotaForLabel(provider_label)
except NotImplementedError:
return QuotaInformation()
def unmanagedQuotaUsed(self):
Sums up the quota used by servers unmanaged by nodepool.
:return: Calculated quota in use by unmanaged servers
used_quota = QuotaInformation()
node_ids = set([ for n in self._zk.nodeIterator()])
for instance in self.adapter.listInstances():
meta = instance.metadata
nodepool_provider_name = meta.get('nodepool_provider_name')
if (nodepool_provider_name and
nodepool_provider_name ==
# This provider (regardless of the launcher) owns this
# node so it must not be accounted for unmanaged
# quota; unless it has leaked.
nodepool_node_id = meta.get('nodepool_node_id')
if nodepool_node_id and nodepool_node_id in node_ids:
# It has not leaked.
qi = instance.getQuotaInformation()
except NotImplementedError:
qi = QuotaInformation()
return used_quota
def cleanupNode(self, external_id):
# TODO: This happens in a thread-per-node in the launcher
# (above the driver level). If we want to make this
# single-threaded (yes), we'll need to modify the launcher
# itself.
state_machine = self.adapter.getDeleteStateMachine(
self.delete_state_machines[external_id] = state_machine
def waitForNodeCleanup(self, external_id, timeout=600):
for count in iterate_timeout(
timeout, exceptions.ServerDeleteException,
"server %s deletion" % external_id):
sm = self.delete_state_machines[external_id]
self.log.debug(f"State machine for {external_id} at "
if sm.complete:
self.delete_state_machines.pop(external_id, None)
def cleanupLeakedResources(self):
known_nodes = set()
for node in self._zk.nodeIterator():
if node.provider !=
metadata = {'nodepool_provider_name':}
self.adapter.cleanupLeakedResources(known_nodes, metadata)
# Driver implementation
class StateMachineDriver(Driver):
"""Entrypoint for a state machine driver"""
def getProvider(self, provider_config):
# Return a provider.
# Usually this method does not need to be overridden.
adapter = self.getAdapter(provider_config)
return StateMachineProvider(adapter, provider_config)
# Public interface
def getProviderConfig(self, provider):
"""Instantiate a config object
:param dict provider: A dictionary of YAML config describing
the provider.
:returns: A ProviderConfig instance with the parsed data.
raise NotImplementedError()
def getAdapter(self, provider_config):
"""Instantiate an adapter
:param ProviderConfig provider_config: An instance of
ProviderConfig previously returned by :py:meth:`getProviderConfig`.
:returns: An instance of :py:class:`SimpleTaskManagerAdapter`
raise NotImplementedError()
# Adapter public interface
class Instance:
"""Represents a cloud instance
This class is used by the Simple Task Manager Driver classes to
represent a standardized version of a remote cloud instance.
Implement this class in your driver, override the :py:meth:`load`
method, and supply as many of the fields as possible.
The following attributes are required:
* ready: bool (whether the instance is ready)
* deleted: bool (whether the instance is in a deleted state)
* external_id: str (the unique id of the instance)
* interface_ip: str
* metadata: dict
The following are optional:
* public_ipv4: str
* public_ipv6: str
* private_ipv4: str
* az: str
* region: str
def __init__(self):
self.ready = False
self.deleted = False
self.external_id = None
self.public_ipv4 = None
self.public_ipv6 = None
self.private_ipv4 = None
self.interface_ip = None = None
self.region = None
self.metadata = {}
def __repr__(self):
state = []
if self.ready:
if self.deleted:
state = ' '.join(state)
return '<{klass} {external_id} {state}>'.format(
def getQuotaInformation(self):
"""Return quota information about this instance.
:returns: A :py:class:`QuotaInformation` object.
raise NotImplementedError()
class StateMachine:
START = 'start'
def __init__(self):
self.state = self.START
self.external_id = None
self.complete = False
self.start_time = time.monotonic()
def advance(self):
class Adapter:
"""Cloud adapter for the State Machine Driver
This class will be instantiated once for each Nodepool provider.
It may be discarded and replaced if the configuration changes.
You may establish a single long-lived connection to the cloud in
the initializer if you wish.
:param ProviderConfig provider_config: A config object
representing the provider.
def __init__(self, provider_config):
def getCreateStateMachine(self, hostname, label, metadata, retries):
"""Return a state machine suitable for creating an instance
This method should return a new state machine object
initialized to create the described node.
:param str hostname: The hostname of the node.
:param ProviderLabel label: A config object representing the
provider-label for the node.
:param metadata dict: A dictionary of metadata that must be
stored on the instance in the cloud. The same data must be
able to be returned later on :py:class:`Instance` objects
returned from `listInstances`.
:param retries int: The number of attempts which should be
made to launch the node.
:returns: A :py:class:`StateMachine` object.
raise NotImplementedError()
def getDeleteStateMachine(self, external_id):
"""Return a state machine suitable for deleting an instance
This method should return a new state machine object
initialized to delete the described instance.
:param str external_id: The external_id of the instance, as
supplied by a creation StateMachine or an Instance.
raise NotImplementedError()
def listInstances(self):
"""Return an iterator of instances accessible to this provider.
The yielded values should represent all instances accessible
to this provider, not only those under the control of this
adapter, but all visible instances in order to achive accurate
quota calculation.
:returns: A generator of :py:class:`Instance` objects.
raise NotImplementedError()
def cleanupLeakedResources(self, known_nodes, metadata):
"""Delete any leaked resources
Use the supplied metadata to delete any leaked resources.
Typically you would list all of the instances in the provider,
then list any other resources (floating IPs, etc). Any
resources with matching metadata not associated with an
instance should be deleted, and any instances with matching
metadata and whose node id is not in known_nodes should also
be deleted.
Look up the `nodepool_node_id` metadata attribute on cloud
instances to compare to known_nodes.
:param set known_nodes: A set of string values representing
known node ids.
:param dict metadata: Metadata values to compare with cloud
raise NotImplementedError()
def getQuotaLimits(self):
"""Return the quota limits for this provider
The default implementation returns a simple QuotaInformation
with no limits. Override this to provide accurate
:returns: A :py:class:`QuotaInformation` object.
return QuotaInformation(default=math.inf)
def getQuotaForLabel(self, label):
"""Return information about the quota used for a label
The default implementation returns a simple QuotaInformation
for one instance; override this to return more detailed
information including cores and RAM.
:param ProviderLabel label: A config object describing
a label for an instance.
:returns: A :py:class:`QuotaInformation` object.
return QuotaInformation(instances=1)


@ -291,3 +291,37 @@ class QuotaSupport:
self.log.exception("Couldn't consider invalid node %s "
"for quota:" % node)
return used_quota
class RateLimiter:
"""A Rate limiter
:param str name: The provider name; used in logging.
:param float rate_limit: The rate limit expressed in
requests per second.
.. code:: python
rate_limiter = RateLimiter('provider', 1.0)
with rate_limiter:
def __init__(self, name, rate_limit):
self._running = True = name = 1.0 / rate_limit
self.last_ts = None
def __enter__(self):
if self.last_ts is None:
while True:
delta = time.monotonic() - self.last_ts
if delta >=
time.sleep( - delta)
def __exit__(self, etype, value, tb):
self.last_ts = time.monotonic()