665 lines
23 KiB

# Copyright (C) 2018 Red Hat
# Copyright 2022 Acme Gating, LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import concurrent.futures
import copy
import logging
import math
import os
import threading
import time
from collections import defaultdict
from kazoo import exceptions as kze
from nodepool import exceptions
from nodepool import stats
from nodepool.zk import zookeeper as zk
from nodepool.logconfig import get_annotated_logger
MAX_QUOTA_AGE = 5 * 60 # How long to keep the quota information cached
class NodeLauncher(threading.Thread,
Class to launch a single node within a thread and record stats.
At this time, the implementing class must manage this thread.
def __init__(self, handler, node, provider_config):
:param NodeRequestHandler handler: The handler object.
:param Node node: A Node object describing the node to launch.
:param ProviderConfig provider_config: A ProviderConfig object
describing the provider launching this node.
threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id)
launcher = handler.pw.nodepool
stats.StatsReporter.__init__(self, launcher.statsd)
logger = logging.getLogger("nodepool.NodeLauncher")
request = handler.request
self.log = get_annotated_logger(logger,
self.handler = handler
self.zk = handler.zk
self.node = node
self.provider_config = provider_config
def launch(self):
def run(self):
start_time = time.monotonic()
statsd_key = 'ready'
except kze.SessionExpiredError:
# Our node lock is gone, leaving the node state as BUILDING.
# This will get cleaned up in ZooKeeper automatically, but we
# must still set our cached node state to FAILED for the
# NodeLaunchManager's poll() method.
"Lost ZooKeeper session trying to launch for node %s",
self.node.state = zk.FAILED
statsd_key = 'error.zksession'
except exceptions.QuotaException:
# We encountered a quota error when trying to launch a
# node. In this case we need to abort the launch. The upper
# layers will take care of this and reschedule a new node once
# the quota is ok again.
self.log.info("Aborting node %s due to quota failure" %
self.node.state = zk.ABORTED
statsd_key = 'error.quota'
except Exception as e:
"Launch failed for node %s: tenant=%s label=%s hostname=%s",
self.node.state = zk.FAILED
if hasattr(e, 'statsd_key'):
statsd_key = e.statsd_key
statsd_key = 'error.unknown'
dt = int((time.monotonic() - start_time) * 1000)
self.recordLaunchStats(statsd_key, dt)
except Exception:
self.log.exception("Exception while reporting stats:")
class NodeDeleter(threading.Thread):
log = logging.getLogger("nodepool.NodeDeleter")
def __init__(self, zk, provider_manager, node):
threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
(node.provider, node.external_id))
self._zk = zk
self._provider_manager = provider_manager
self._node = node
def delete(zk_conn, manager, node, node_exists=True):
Delete a server instance and ZooKeeper node.
This is a class method so we can support instantaneous deletes.
:param ZooKeeper zk_conn: A ZooKeeper object to use.
:param ProviderManager provider_manager: ProviderManager object to
use fo deleting the server.
:param Node node: A locked Node object that describes the server to
:param bool node_exists: True if the node actually exists in ZooKeeper.
An artifical Node object can be passed that can be used to delete
a leaked instance.
node.state = zk.DELETING
if node.external_id:
except exceptions.NotFound:
NodeDeleter.log.info("Instance %s not found in provider %s",
node.external_id, node.provider)
except Exception:
"Exception deleting instance %s from %s:",
node.external_id, node.provider)
# Don't delete the ZK node in this case, but do unlock it
if node_exists:
if node_exists:
"Deleting ZK node id=%s, state=%s, external_id=%s",
node.id, node.state, node.external_id)
# This also effectively releases the lock
def run(self):
# Since leaked instances won't have an actual node in ZooKeeper,
# we need to check 'id' to see if this is an artificial Node.
if self._node.id is None:
node_exists = False
node_exists = True
self.delete(self._zk, self._provider_manager,
self._node, node_exists)
except Exception:
self.log.exception("Error deleting node %s:", self._node)
class QuotaInformation:
def __init__(self, cores=None, instances=None, ram=None, default=0, **kw):
Initializes the quota information with some values. None values will
be initialized with default which will be typically 0 or math.inf
indicating an infinite limit.
:param cores: An integer number of (v)CPU cores.
:param instances: An integer number of instances.
:param ram: An integer amount of RAM in Mebibytes.
:param default: The default value to use for any attribute not supplied
(usually 0 or math.inf).
# Note that the self.quota['compute'] map is inserted into ZK as
# a node property (via get_resources) and is consumed by Zuul to
# calculate resource usage. Thus care should be taken if
# modifying fields below.
self.quota = {
'compute': {
'cores': self._get_default(cores, default),
'instances': self._get_default(instances, default),
'ram': self._get_default(ram, default),
for k, v in kw.items():
self.quota['compute'][k] = v
self.default = default
def _get_default(self, value, default):
return value if value is not None else default
def _add_subtract(self, other, add=True):
for category in other.quota.keys():
self.quota.setdefault(category, {})
for resource in other.quota[category].keys():
self.quota[category].setdefault(resource, self.default)
for category in self.quota.keys():
for resource in self.quota[category].keys():
second_value = other.quota.get(category, {}).get(
resource, other.default)
if add:
self.quota[category][resource] += second_value
self.quota[category][resource] -= second_value
def subtract(self, other):
self._add_subtract(other, add=False)
def add(self, other):
self._add_subtract(other, True)
def non_negative(self):
for key_i, category in self.quota.items():
for resource, value in category.items():
if value < 0:
return False
return True
def get_resources(self):
'''Return resources value to register in ZK node'''
return self.quota['compute']
def from_resources(resources):
qi = QuotaInformation()
qi.quota['compute'] = resources
return qi
def __str__(self):
return str(self.quota)
class QuotaSupport:
"""A mix-in class for providers to supply quota support methods"""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self._current_nodepool_quota_timestamp = 0
self._current_nodepool_quota = {}
def quotaNeededByLabel(self, label, pool):
"""Return quota information about a label
:param str label: The label name
:param ProviderPool pool: A ProviderPool config object with the label
:return: QuotaInformation about the label
def unmanagedQuotaUsed(self):
Sums up the quota used by servers unmanaged by nodepool.
:return: Calculated quota in use by unmanaged servers
def getProviderLimits(self):
Get the resource limits from the provider.
:return: QuotaInformation about the label
def invalidateQuotaCache(self):
self._current_nodepool_quota_timestamp = 0
def estimatedNodepoolQuota(self):
Determine how much quota is available for nodepool managed
resources. This needs to take into account the quota of the
tenant, resources used outside of nodepool, and the max
settings in nodepool config. This is cached for MAX_QUOTA_AGE
:return: Total amount of resources available which is currently
available to nodepool including currently existing nodes.
if self._current_nodepool_quota:
now = time.time()
if now < self._current_nodepool_quota_timestamp + MAX_QUOTA_AGE:
return copy.deepcopy(self._current_nodepool_quota)
# This is initialized with the full tenant quota and later becomes
# the quota available for nodepool.
nodepool_quota = self.getProviderLimits()
except Exception:
if self._current_nodepool_quota:
self.log.exception("Unable to get provider quota, "
"using cached value")
return copy.deepcopy(self._current_nodepool_quota)
self.log.debug("Provider quota for %s: %s",
self.provider.name, nodepool_quota)
# Subtract the unmanaged quota usage from nodepool_max
# to get the quota available for us.
self._current_nodepool_quota = nodepool_quota
self._current_nodepool_quota_timestamp = time.time()
self.log.debug("Available quota for %s: %s",
self.provider.name, nodepool_quota)
return copy.deepcopy(nodepool_quota)
def estimatedNodepoolQuotaUsed(self, pool=None):
Sums up the quota used (or planned) currently by nodepool. If pool is
given it is filtered by the pool.
:param pool: If given, filtered by the pool.
:return: Calculated quota in use by nodepool
used_quota = QuotaInformation()
for node in self._zk.nodeIterator(cached_ids=True):
if node.provider == self.provider.name:
if pool and not node.pool == pool.name:
provider_pool = self.provider.pools.get(node.pool)
if not provider_pool:
"Cannot find provider pool for node %s" % node)
# This node is in a funny state we log it for debugging
# but move on and don't account it as we can't properly
# calculate its cost without pool info.
if node.type[0] not in provider_pool.labels:
self.log.warning("Node type is not in provider pool "
"for node %s" % node)
# This node is also in a funny state; the config
# may have changed under it. It should settle out
# eventually when it's deleted.
# If the node resources is not None, we can use that to
# construct the qi object for the node.
if node.resources:
node_resources = QuotaInformation.from_resources(
node_resources = self.quotaNeededByLabel(
node.type[0], provider_pool)
except Exception:
self.log.exception("Couldn't consider invalid node %s "
"for quota:" % node)
return used_quota
def getLabelQuota(self):
"""Return available quota per label.
:returns: Mapping of labels to available quota
return defaultdict(lambda: math.inf)
class RateLimitInstance:
def __init__(self, limiter, logger, msg):
self.limiter = limiter
self.logger = logger
self.msg = msg
def __enter__(self):
self.delay = self.limiter._enter()
self.start_time = time.monotonic()
def __exit__(self, etype, value, tb):
end_time = time.monotonic()
self.limiter._exit(etype, value, tb)
self.logger("%s in %ss after %ss delay",
end_time - self.start_time,
class RateLimiter:
"""A Rate limiter
:param str name: The provider name; used in logging.
:param float rate_limit: The rate limit expressed in
requests per second.
.. code:: python
rate_limiter = RateLimiter('provider', 1.0)
with rate_limiter:
You can optionally use the limiter as a callable in which case it
will log a supplied message with timing information.
.. code:: python
rate_limiter = RateLimiter('provider', 1.0)
with rate_limiter(log.debug, "an API call"):
def __init__(self, name, rate_limit):
self._running = True
self.name = name
if not rate_limit:
self.delta = 0.0
self.delta = 1.0 / rate_limit
self.last_ts = None
self.lock = threading.Lock()
def __call__(self, logmethod, msg):
return RateLimitInstance(self, logmethod, msg)
def __enter__(self):
def _enter(self):
with self.lock:
total_delay = 0.0
if self.last_ts is None:
self.last_ts = time.monotonic()
return total_delay
while True:
now = time.monotonic()
delta = now - self.last_ts
if delta >= self.delta:
delay = self.delta - delta
total_delay += delay
self.last_ts = time.monotonic()
return total_delay
def __exit__(self, etype, value, tb):
self._exit(etype, value, tb)
def _exit(self, etype, value, tb):
class LazyExecutorTTLCache:
"""This is a lazy executor TTL cache.
It's lazy because if it has cached data, it will always return it
It's executor based, which means that if a cache miss occurs, it
will submit a task to an executor to fetch new data.
Finally, it's a TTL cache, which means it automatically expires data.
Since it is only expected to be used when caching provider
resource listing methods, it assumes there will only be one entry
and ignores arguments -- it will return the same cached data no
matter what arguments are supplied; but it will pass on those
arguments to the underlying method in a cache miss.
:param numeric ttl: The cache timeout in seconds.
:param concurrent.futures.Executor executor: An executor to use to
update data asynchronously in case of a cache miss.
def __init__(self, ttl, executor):
self.ttl = ttl
self.executor = executor
# If we have an outstanding update being run by the executor,
# this is the future.
self.future = None
# The last time the underlying method completed.
self.last_time = None
# The last value from the underlying method.
self.last_value = None
# A lock to make all of this thread safe (especially to ensure
# we don't fire off multiple updates).
self.lock = threading.Lock()
def __call__(self, func):
def decorator(*args, **kw):
with self.lock:
now = time.monotonic()
if self.future and self.future.done():
# If a previous call spawned an update, resolve
# that now so we can use the data.
self.last_time, self.last_value = self.future.result()
# Clear the future regardless so we don't loop.
self.future = None
if (self.last_time is not None and
now - self.last_time < self.ttl):
# A cache hit.
return self.last_value
# The rest of the method is a cache miss.
if self.last_time is not None:
if not self.future:
# Fire off an asynchronous update request.
# This second wrapper ensures that we record
# the time that the update is complete along
# with the value.
def func_with_time():
ret = func(*args, **kw)
now = time.monotonic()
return (now, ret)
self.future = self.executor.submit(func_with_time)
# This is the first time this method has been
# called; since we don't have any cached data, we
# will synchronously update the data.
self.last_value = func(*args, **kw)
self.last_time = time.monotonic()
return self.last_value
return decorator
class Segment:
def __init__(self, index, offset, data):
self.index = index
self.offset = offset
self.data = data
class ImageUploader:
A helper class for drivers that upload large images in chunks.
# These values probably don't need to be changed
error_retries = 3
concurrency = 10
# Subclasses must implement these
segment_size = None
def __init__(self, adapter, log, path, image_name, metadata):
if self.segment_size is None:
raise Exception("Subclass must set block size")
self.adapter = adapter
self.log = log
self.path = path
self.size = os.path.getsize(path)
self.image_name = image_name
self.metadata = metadata
self.timeout = None
def shouldRetryException(self, exception):
return True
def uploadSegment(self, segment):
def startUpload(self):
def finishUpload(self):
def abortUpload(self):
# Main API
def upload(self, timeout=None):
if timeout:
self.timeout = time.monotonic() + timeout
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.concurrency) as executor:
with open(self.path, 'rb') as image_file:
self._uploadInner(executor, image_file)
return self.finishUpload()
except Exception:
self.log.exception("Error uploading image:")
# Subclasses can use this helper method for wrapping retryable calls
def retry(self, func, *args, **kw):
for x in range(self.error_retries):
return func(*args, **kw)
except Exception as e:
if not self.shouldRetryException(e):
if x + 1 >= self.error_retries:
time.sleep(2 * x)
def getTimeout(self):
if self.timeout is None:
return None
return self.timeout - time.monotonic()
def checkTimeout(self):
if self.timeout is None:
if self.getTimeout() < 0:
raise Exception("Timed out uploading image")
# Internal methods
def _uploadInner(self, executor, image_file):
futures = set()
for index, offset in enumerate(range(0, self.size, self.segment_size)):
segment = Segment(index, offset,
future = executor.submit(self.uploadSegment, segment)
# Keep the pool of workers supplied with data but without
# reading the entire file into memory.
if len(futures) >= (self.concurrency * 2):
(done, futures) = concurrent.futures.wait(
for future in done:
# Only check the timeout after waiting (not every pass
# through the loop)
# We're done reading the file, wait for all uploads to finish
(done, futures) = concurrent.futures.wait(
for future in done: