181 lines
6.8 KiB
Python
181 lines
6.8 KiB
Python
# Copyright 2018 Red Hat
|
|
# Copyright 2023 Acme Gating, LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import math
|
|
import logging
|
|
|
|
from kazoo import exceptions as kze
|
|
|
|
from nodepool import exceptions
|
|
from nodepool.zk import zookeeper as zk
|
|
from nodepool.driver import NodeRequestHandler
|
|
from nodepool.driver.utils import NodeLauncher, QuotaInformation
|
|
|
|
|
|
class OpenshiftLauncher(NodeLauncher):
|
|
def __init__(self, handler, node, provider_config, provider_label):
|
|
super().__init__(handler, node, provider_config)
|
|
self.label = provider_label
|
|
self._retries = provider_config.launch_retries
|
|
|
|
def _launchLabel(self):
|
|
self.log.debug("Creating resource")
|
|
project = "%s-%s" % (self.handler.pool.name, self.node.id)
|
|
self.node.external_id = self.handler.manager.createProject(
|
|
self.node, self.handler.pool.name, project, self.label,
|
|
self.handler.request)
|
|
self.zk.storeNode(self.node)
|
|
|
|
resource = self.handler.manager.prepareProject(project)
|
|
if self.label.type == "pod":
|
|
self.handler.manager.createPod(
|
|
self.node, self.handler.pool.name,
|
|
project, self.label.name, self.label, self.handler.request)
|
|
self.handler.manager.waitForPod(project, self.label.name)
|
|
resource['pod'] = self.label.name
|
|
self.node.connection_type = "kubectl"
|
|
self.node.interface_ip = self.label.name
|
|
else:
|
|
self.node.connection_type = "project"
|
|
|
|
self.node.state = zk.READY
|
|
self.node.python_path = self.label.python_path
|
|
self.node.shell_type = self.label.shell_type
|
|
# NOTE: resource access token may be encrypted here
|
|
self.node.connection_port = resource
|
|
pool = self.handler.provider.pools.get(self.node.pool)
|
|
self.node.resources = self.handler.manager.quotaNeededByLabel(
|
|
self.node.type[0], pool).get_resources()
|
|
self.node.cloud = self.provider_config.context
|
|
self.zk.storeNode(self.node)
|
|
self.log.info("Resource %s is ready", project)
|
|
|
|
def launch(self):
|
|
attempts = 1
|
|
while attempts <= self._retries:
|
|
try:
|
|
self._launchLabel()
|
|
break
|
|
except kze.SessionExpiredError:
|
|
# If we lost our ZooKeeper session, we've lost our node lock
|
|
# so there's no need to continue.
|
|
raise
|
|
except Exception as e:
|
|
if attempts <= self._retries:
|
|
self.log.exception(
|
|
"Launch attempt %d/%d failed for node %s:",
|
|
attempts, self._retries, self.node.id)
|
|
# If we created an instance, delete it.
|
|
if self.node.external_id:
|
|
self.handler.manager.cleanupNode(self.node.external_id)
|
|
self.handler.manager.waitForNodeCleanup(
|
|
self.node.external_id)
|
|
self.node.external_id = None
|
|
self.node.interface_ip = None
|
|
self.zk.storeNode(self.node)
|
|
if 'exceeded quota' in str(e).lower():
|
|
self.log.info("%s: quota exceeded", self.node.id)
|
|
raise exceptions.QuotaException("Quota exceeded")
|
|
if attempts == self._retries:
|
|
raise
|
|
attempts += 1
|
|
|
|
|
|
class OpenshiftNodeRequestHandler(NodeRequestHandler):
|
|
log = logging.getLogger("nodepool.driver.openshift."
|
|
"OpenshiftNodeRequestHandler")
|
|
|
|
def __init__(self, pw, request):
|
|
super().__init__(pw, request)
|
|
self._threads = []
|
|
|
|
@property
|
|
def alive_thread_count(self):
|
|
count = 0
|
|
for t in self._threads:
|
|
if t.is_alive():
|
|
count += 1
|
|
return count
|
|
|
|
def imagesAvailable(self):
|
|
return True
|
|
|
|
def launchesComplete(self):
|
|
'''
|
|
Check if all launch requests have completed.
|
|
|
|
When all of the Node objects have reached a final state (READY or
|
|
FAILED), we'll know all threads have finished the launch process.
|
|
'''
|
|
if not self._threads:
|
|
return True
|
|
|
|
# Give the NodeLaunch threads time to finish.
|
|
if self.alive_thread_count:
|
|
return False
|
|
|
|
node_states = [node.state for node in self.nodeset]
|
|
|
|
# NOTE: It's very important that NodeLauncher always sets one of
|
|
# these states, no matter what.
|
|
if not all(s in (zk.READY, zk.FAILED, zk.ABORTED)
|
|
for s in node_states):
|
|
return False
|
|
|
|
return True
|
|
|
|
def hasRemainingQuota(self, ntype):
|
|
'''
|
|
Checks if the predicted quota is enough for an additional node of type
|
|
ntype.
|
|
|
|
:param ntype: node type for the quota check
|
|
:return: True if there is enough quota, False otherwise
|
|
'''
|
|
needed_quota = self.manager.quotaNeededByLabel(ntype, self.pool)
|
|
|
|
# Calculate remaining quota which is calculated as:
|
|
# quota = <total nodepool quota> - <used quota> - <quota for node>
|
|
cloud_quota = self.manager.estimatedNodepoolQuota()
|
|
cloud_quota.subtract(
|
|
self.manager.estimatedNodepoolQuotaUsed())
|
|
cloud_quota.subtract(needed_quota)
|
|
self.log.debug("Predicted remaining provider quota: %s",
|
|
cloud_quota)
|
|
|
|
if not cloud_quota.non_negative():
|
|
return False
|
|
|
|
# Now calculate pool specific quota. Values indicating no quota default
|
|
# to math.inf representing infinity that can be calculated with.
|
|
args = dict(cores=getattr(self.pool, 'max_cores', None),
|
|
instances=self.pool.max_servers,
|
|
ram=getattr(self.pool, 'max_ram', None))
|
|
args.update(self.pool.max_resources)
|
|
pool_quota = QuotaInformation(**args, default=math.inf)
|
|
pool_quota.subtract(
|
|
self.manager.estimatedNodepoolQuotaUsed(self.pool))
|
|
self.log.debug("Current pool quota: %s" % pool_quota)
|
|
pool_quota.subtract(needed_quota)
|
|
self.log.debug("Predicted remaining pool quota: %s", pool_quota)
|
|
|
|
return pool_quota.non_negative()
|
|
|
|
def launch(self, node):
|
|
label = self.pool.labels[node.type[0]]
|
|
thd = OpenshiftLauncher(self, node, self.provider, label)
|
|
thd.start()
|
|
self._threads.append(thd)
|