Merge "Support threadless deletes"

This commit is contained in:
Zuul 2021-06-11 14:19:00 +00:00 committed by Gerrit Code Review
commit ee22b88ab5
15 changed files with 252 additions and 106 deletions

View File

@ -261,7 +261,8 @@ class NodePoolCmd(NodepoolApp):
provider = self.pool.config.providers[node.provider] provider = self.pool.config.providers[node.provider]
manager = provider_manager.get_provider(provider) manager = provider_manager.get_provider(provider)
manager.start(self.zk) manager.start(self.zk)
launcher.NodeDeleter.delete(self.zk, manager, node) node_deleter = manager.startNodeCleanup(node)
node_deleter.join()
manager.stop() manager.stop()
else: else:
node.state = zk.DELETING node.state = zk.DELETING

View File

@ -221,6 +221,19 @@ class Provider(ProviderNotifications):
""" """
pass pass
def startNodeCleanup(self, node):
'''Starts a background process to delete a node
This should return a NodeDeleter to implement and track the
deletion of a node.
:param Node node: A locked Node object representing the
instance to delete.
:returns: A NodeDeleter instance.
'''
pass
@abc.abstractmethod @abc.abstractmethod
def cleanupNode(self, node_id): def cleanupNode(self, node_id):
"""Cleanup a node after use """Cleanup a node after use
@ -798,7 +811,7 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
def launchesComplete(self): def launchesComplete(self):
''' '''
Handler needs to implement this to check if all nodes in self.nodeset Handler needs to implement this to check if all nodes in self.nodeset
have completed the launch sequence.. have completed the launch sequence.
This method will be called periodically to check on launch progress. This method will be called periodically to check on launch progress.

View File

@ -18,6 +18,7 @@ import botocore.exceptions
import nodepool.exceptions import nodepool.exceptions
from nodepool.driver import Provider from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.aws.handler import AwsNodeRequestHandler from nodepool.driver.aws.handler import AwsNodeRequestHandler
@ -53,6 +54,7 @@ class AwsProvider(Provider):
return AwsNodeRequestHandler(poolworker, request) return AwsNodeRequestHandler(poolworker, request)
def start(self, zk_conn): def start(self, zk_conn):
self._zk = zk_conn
if self.ec2 is not None: if self.ec2 is not None:
return True return True
self.log.debug("Starting") self.log.debug("Starting")
@ -152,6 +154,11 @@ class AwsProvider(Provider):
# TODO: remove leaked resources if any # TODO: remove leaked resources if any
pass pass
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id): def cleanupNode(self, server_id):
if self.ec2 is None: if self.ec2 is None:
return False return False

View File

@ -16,6 +16,7 @@ import logging
import json import json
from nodepool.driver import Provider from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.azure import handler from nodepool.driver.azure import handler
from nodepool import zk from nodepool import zk
@ -196,6 +197,11 @@ class AzureProvider(Provider):
node.state = zk.DELETING node.state = zk.DELETING
self._zk.storeNode(node) self._zk.storeNode(node)
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id): def cleanupNode(self, server_id):
self.log.debug('Server ID: %s' % server_id) self.log.debug('Server ID: %s' % server_id)
try: try:

View File

@ -25,6 +25,7 @@ from nodepool import exceptions
from nodepool.driver import Provider from nodepool.driver import Provider
from nodepool.driver.kubernetes import handler from nodepool.driver.kubernetes import handler
from nodepool.driver.utils import QuotaInformation, QuotaSupport from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
urllib3.disable_warnings() urllib3.disable_warnings()
@ -121,6 +122,11 @@ class KubernetesProvider(Provider, QuotaSupport):
def cleanupLeakedResources(self): def cleanupLeakedResources(self):
pass pass
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id): def cleanupNode(self, server_id):
if not self.ready: if not self.ready:
return return

View File

@ -23,6 +23,7 @@ from openshift.dynamic import DynamicClient as os_client
from nodepool import exceptions from nodepool import exceptions
from nodepool.driver import Provider from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.openshift import handler from nodepool.driver.openshift import handler
urllib3.disable_warnings() urllib3.disable_warnings()
@ -54,6 +55,7 @@ class OpenshiftProvider(Provider):
def start(self, zk_conn): def start(self, zk_conn):
self.log.debug("Starting") self.log.debug("Starting")
self._zk = zk_conn
if self.ready or not self.os_client or not self.k8s_client: if self.ready or not self.os_client or not self.k8s_client:
return return
self.ready = True self.ready = True
@ -103,6 +105,11 @@ class OpenshiftProvider(Provider):
def cleanupLeakedResources(self): def cleanupLeakedResources(self):
pass pass
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id): def cleanupNode(self, server_id):
if not self.ready: if not self.ready:
return return

View File

@ -20,6 +20,7 @@ import time
from kubernetes import client as k8s_client from kubernetes import client as k8s_client
from kubernetes import config as k8s_config from kubernetes import config as k8s_config
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.openshift.provider import OpenshiftProvider from nodepool.driver.openshift.provider import OpenshiftProvider
from nodepool.driver.openshiftpods import handler from nodepool.driver.openshiftpods import handler
@ -61,6 +62,7 @@ class OpenshiftPodsProvider(OpenshiftProvider):
def start(self, zk_conn): def start(self, zk_conn):
self.log.debug("Starting") self.log.debug("Starting")
self._zk = zk_conn
if self.ready or not self.k8s_client: if self.ready or not self.k8s_client:
return return
self.ready = True self.ready = True
@ -103,6 +105,11 @@ class OpenshiftPodsProvider(OpenshiftProvider):
return pool, pod_name return pool, pod_name
return None, None return None, None
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id): def cleanupNode(self, server_id):
if not self.ready: if not self.ready:
return return

View File

@ -25,6 +25,7 @@ from openstack.exceptions import ResourceTimeout
from nodepool import exceptions from nodepool import exceptions
from nodepool.driver import Provider from nodepool.driver import Provider
from nodepool.driver.utils import QuotaInformation, QuotaSupport from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
from nodepool import stats from nodepool import stats
from nodepool import version from nodepool import version
from nodepool import zk from nodepool import zk
@ -433,6 +434,11 @@ class OpenStackProvider(Provider, QuotaSupport):
def deleteServer(self, server_id): def deleteServer(self, server_id):
return self._client.delete_server(server_id, delete_ips=True) return self._client.delete_server(server_id, delete_ips=True)
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id): def cleanupNode(self, server_id):
server = self.getServer(server_id) server = self.getServer(server_id)
if not server: if not server:

View File

@ -19,6 +19,7 @@ import math
from nodepool.driver.taskmanager import BaseTaskManagerProvider, Task from nodepool.driver.taskmanager import BaseTaskManagerProvider, Task
from nodepool.driver import Driver, NodeRequestHandler from nodepool.driver import Driver, NodeRequestHandler
from nodepool.driver.utils import NodeLauncher, QuotaInformation, QuotaSupport from nodepool.driver.utils import NodeLauncher, QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
from nodepool.nodeutils import iterate_timeout, nodescan from nodepool.nodeutils import iterate_timeout, nodescan
from nodepool import exceptions from nodepool import exceptions
from nodepool import zk from nodepool import zk
@ -355,6 +356,11 @@ class SimpleTaskManagerProvider(BaseTaskManagerProvider, QuotaSupport):
return used_quota return used_quota
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, external_id): def cleanupNode(self, external_id):
instance = self.getInstance(external_id) instance = self.getInstance(external_id)
if (not instance) or instance.deleted: if (not instance) or instance.deleted:

View File

@ -22,7 +22,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from nodepool.driver import Driver, NodeRequestHandler, Provider from nodepool.driver import Driver, NodeRequestHandler, Provider
from nodepool.driver.utils import QuotaInformation, QuotaSupport from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.nodeutils import iterate_timeout, nodescan from nodepool.nodeutils import nodescan
from nodepool.logconfig import get_annotated_logger from nodepool.logconfig import get_annotated_logger
from nodepool import stats from nodepool import stats
from nodepool import exceptions from nodepool import exceptions
@ -204,6 +204,86 @@ class StateMachineNodeLauncher(stats.StatsReporter):
return True return True
class StateMachineNodeDeleter:
"""The state of the state machine.
This driver collects state machines from the underlying cloud
adapter implementations; those state machines handle building the
node. But we still have our own accounting and a little bit of
work to do on either side of that process, so this data structure
holds the extra information we need for that.
"""
DELETE_TIMEOUT = 600
def __init__(self, zk, provider_manager, node):
# Based on utils.NodeDeleter
self.log = logging.getLogger("nodepool.StateMachineNodeDeleter")
self.manager = provider_manager
self.zk = zk
# Note: the node is locked
self.node = node
# Local additions:
self.start_time = time.monotonic()
self.state_machine = self.manager.adapter.getDeleteStateMachine(
node.external_id)
@property
def complete(self):
return self.state_machine.complete
def runStateMachine(self):
state_machine = self.state_machine
node = self.node
node_exists = (node.id is not None)
if self.state_machine.complete:
return True
try:
now = time.monotonic()
if now - state_machine.start_time > self.DELETE_TIMEOUT:
raise Exception("Timeout waiting for instance deletion")
if state_machine.state == state_machine.START:
node.state = zk.DELETING
self.zk.storeNode(node)
if node.external_id:
state_machine.advance()
self.log.debug(f"State machine for {node.id} at "
"{state_machine.state}")
if not self.state_machine.complete:
return
except exceptions.NotFound:
self.log.info(f"Instance {node.external_id} not found in "
"provider {node.provider}")
except Exception:
self.log.exception("Exception deleting instance "
f"{node.external_id} from {node.provider}:")
# Don't delete the ZK node in this case, but do unlock it
if node_exists:
self.zk.unlockNode(node)
self.state_machine.complete = True
return
if node_exists:
self.log.info(
"Deleting ZK node id=%s, state=%s, external_id=%s",
node.id, node.state, node.external_id)
# This also effectively releases the lock
self.zk.deleteNode(node)
self.manager.nodeDeletedNotification(node)
return True
def join(self):
# This is used by the CLI for synchronous deletes
while self in self.manager.deleters:
time.sleep(0)
class StateMachineHandler(NodeRequestHandler): class StateMachineHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.simple." log = logging.getLogger("nodepool.driver.simple."
"StateMachineHandler") "StateMachineHandler")
@ -325,7 +405,8 @@ class StateMachineProvider(Provider, QuotaSupport):
super().__init__() super().__init__()
self.provider = provider self.provider = provider
self.adapter = adapter self.adapter = adapter
self.delete_state_machines = {} # State machines
self.deleters = []
self.launchers = [] self.launchers = []
self._zk = None self._zk = None
self.keyscan_worker = None self.keyscan_worker = None
@ -355,18 +436,23 @@ class StateMachineProvider(Provider, QuotaSupport):
def _runStateMachines(self): def _runStateMachines(self):
while self.running: while self.running:
to_remove = [] to_remove = []
for launcher in self.launchers: loop_start = time.monotonic()
for sm in self.deleters + self.launchers:
try: try:
launcher.runStateMachine() sm.runStateMachine()
if launcher.node.state != zk.BUILDING: if sm.complete:
self.log.debug("Removing state machine from runner") self.log.debug("Removing state machine from runner")
to_remove.append(launcher) to_remove.append(sm)
except Exception: except Exception:
self.log.exception("Error running state machine:") self.log.exception("Error running state machine:")
for launcher in to_remove: for sm in to_remove:
self.launchers.remove(launcher) if sm in self.deleters:
if self.launchers: self.deleters.remove(sm)
time.sleep(0) if sm in self.launchers:
self.launchers.remove(sm)
loop_end = time.monotonic()
if self.launchers or self.deleters:
time.sleep(max(0, 10 - (loop_end - loop_start)))
else: else:
time.sleep(1) time.sleep(1)
@ -424,28 +510,18 @@ class StateMachineProvider(Provider, QuotaSupport):
return used_quota return used_quota
def startNodeCleanup(self, node):
nd = StateMachineNodeDeleter(self._zk, self, node)
self.deleters.append(nd)
return nd
def cleanupNode(self, external_id): def cleanupNode(self, external_id):
# TODO: This happens in a thread-per-node in the launcher # This is no longer used due to our custom NodeDeleter
# (above the driver level). If we want to make this raise NotImplementedError()
# single-threaded (yes), we'll need to modify the launcher
# itself.
state_machine = self.adapter.getDeleteStateMachine(
external_id)
self.delete_state_machines[external_id] = state_machine
def waitForNodeCleanup(self, external_id, timeout=600): def waitForNodeCleanup(self, external_id, timeout=600):
try: # This is no longer used due to our custom NodeDeleter
for count in iterate_timeout( raise NotImplementedError()
timeout, exceptions.ServerDeleteException,
"server %s deletion" % external_id):
sm = self.delete_state_machines[external_id]
sm.advance()
self.log.debug(f"State machine for {external_id} at "
f"{sm.state}")
if sm.complete:
return
finally:
self.delete_state_machines.pop(external_id, None)
def cleanupLeakedResources(self): def cleanupLeakedResources(self):
known_nodes = set() known_nodes = set()

View File

@ -24,6 +24,7 @@ from nodepool import exceptions
from nodepool import nodeutils from nodepool import nodeutils
from nodepool import zk from nodepool import zk
from nodepool.driver import Provider from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.static.handler import StaticNodeRequestHandler from nodepool.driver.static.handler import StaticNodeRequestHandler
@ -389,6 +390,11 @@ class StaticNodeProvider(Provider):
for n in p.nodes for n in p.nodes
} }
def startNodeCleanup(self, node):
t = NodeDeleter(self.zk, self, node)
t.start()
return t
def cleanupNode(self, server_id): def cleanupNode(self, server_id):
return True return True

View File

@ -16,6 +16,7 @@
from nodepool.driver import Provider from nodepool.driver import Provider
from nodepool.driver.test import handler from nodepool.driver.test import handler
from nodepool.driver.utils import NodeDeleter
class TestProvider(Provider): class TestProvider(Provider):
@ -23,7 +24,7 @@ class TestProvider(Provider):
self.provider = provider self.provider = provider
def start(self, zk_conn): def start(self, zk_conn):
pass self._zk = zk_conn
def stop(self): def stop(self):
pass pass
@ -34,6 +35,11 @@ class TestProvider(Provider):
def labelReady(self, name): def labelReady(self, name):
return True return True
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, node_id): def cleanupNode(self, node_id):
pass pass

View File

@ -109,6 +109,73 @@ class NodeLauncher(threading.Thread,
self.log.exception("Exception while reporting stats:") self.log.exception("Exception while reporting stats:")
class NodeDeleter(threading.Thread):
log = logging.getLogger("nodepool.NodeDeleter")
def __init__(self, zk, provider_manager, node):
threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
(node.provider, node.external_id))
self._zk = zk
self._provider_manager = provider_manager
self._node = node
@staticmethod
def delete(zk_conn, manager, node, node_exists=True):
'''
Delete a server instance and ZooKeeper node.
This is a class method so we can support instantaneous deletes.
:param ZooKeeper zk_conn: A ZooKeeper object to use.
:param ProviderManager provider_manager: ProviderManager object to
use fo deleting the server.
:param Node node: A locked Node object that describes the server to
delete.
:param bool node_exists: True if the node actually exists in ZooKeeper.
An artifical Node object can be passed that can be used to delete
a leaked instance.
'''
try:
node.state = zk.DELETING
zk_conn.storeNode(node)
if node.external_id:
manager.cleanupNode(node.external_id)
manager.waitForNodeCleanup(node.external_id)
except exceptions.NotFound:
NodeDeleter.log.info("Instance %s not found in provider %s",
node.external_id, node.provider)
except Exception:
NodeDeleter.log.exception(
"Exception deleting instance %s from %s:",
node.external_id, node.provider)
# Don't delete the ZK node in this case, but do unlock it
if node_exists:
zk_conn.unlockNode(node)
return
if node_exists:
NodeDeleter.log.info(
"Deleting ZK node id=%s, state=%s, external_id=%s",
node.id, node.state, node.external_id)
# This also effectively releases the lock
zk_conn.deleteNode(node)
manager.nodeDeletedNotification(node)
def run(self):
# Since leaked instances won't have an actual node in ZooKeeper,
# we need to check 'id' to see if this is an artificial Node.
if self._node.id is None:
node_exists = False
else:
node_exists = True
try:
self.delete(self._zk, self._provider_manager,
self._node, node_exists)
except Exception:
self.log.exception("Error deleting node %s:", self._node)
class QuotaInformation: class QuotaInformation:
def __init__(self, cores=None, instances=None, ram=None, default=0): def __init__(self, cores=None, instances=None, ram=None, default=0):

View File

@ -46,73 +46,6 @@ LOCK_CLEANUP = 8 * HOURS
SUSPEND_WAIT_TIME = 30 SUSPEND_WAIT_TIME = 30
class NodeDeleter(threading.Thread):
log = logging.getLogger("nodepool.NodeDeleter")
def __init__(self, zk, provider_manager, node):
threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
(node.provider, node.external_id))
self._zk = zk
self._provider_manager = provider_manager
self._node = node
@staticmethod
def delete(zk_conn, manager, node, node_exists=True):
'''
Delete a server instance and ZooKeeper node.
This is a class method so we can support instantaneous deletes.
:param ZooKeeper zk_conn: A ZooKeeper object to use.
:param ProviderManager provider_manager: ProviderManager object to
use fo deleting the server.
:param Node node: A locked Node object that describes the server to
delete.
:param bool node_exists: True if the node actually exists in ZooKeeper.
An artifical Node object can be passed that can be used to delete
a leaked instance.
'''
try:
node.state = zk.DELETING
zk_conn.storeNode(node)
if node.external_id:
manager.cleanupNode(node.external_id)
manager.waitForNodeCleanup(node.external_id)
except exceptions.NotFound:
NodeDeleter.log.info("Instance %s not found in provider %s",
node.external_id, node.provider)
except Exception:
NodeDeleter.log.exception(
"Exception deleting instance %s from %s:",
node.external_id, node.provider)
# Don't delete the ZK node in this case, but do unlock it
if node_exists:
zk_conn.unlockNode(node)
return
if node_exists:
NodeDeleter.log.info(
"Deleting ZK node id=%s, state=%s, external_id=%s",
node.id, node.state, node.external_id)
# This also effectively releases the lock
zk_conn.deleteNode(node)
manager.nodeDeletedNotification(node)
def run(self):
# Since leaked instances won't have an actual node in ZooKeeper,
# we need to check 'id' to see if this is an artificial Node.
if self._node.id is None:
node_exists = False
else:
node_exists = True
try:
self.delete(self._zk, self._provider_manager,
self._node, node_exists)
except Exception:
self.log.exception("Error deleting node %s:", self._node)
class PoolWorker(threading.Thread, stats.StatsReporter): class PoolWorker(threading.Thread, stats.StatsReporter):
''' '''
Class that manages node requests for a single provider pool. Class that manages node requests for a single provider pool.
@ -699,11 +632,8 @@ class DeletedNodeWorker(BaseCleanupWorker):
self.log.info("Deleting %s instance %s from %s", self.log.info("Deleting %s instance %s from %s",
node.state, node.external_id, node.provider) node.state, node.external_id, node.provider)
try: try:
t = NodeDeleter( pm = self._nodepool.getProviderManager(node.provider)
self._nodepool.getZK(), pm.startNodeCleanup(node)
self._nodepool.getProviderManager(node.provider),
node)
t.start()
except Exception: except Exception:
self.log.exception("Could not delete instance %s on provider %s", self.log.exception("Could not delete instance %s on provider %s",
node.external_id, node.provider) node.external_id, node.provider)

View File

@ -975,8 +975,10 @@ class TestLauncher(tests.DBTestCase):
self.assertEqual(len(nodes), 1) self.assertEqual(len(nodes), 1)
self.zk.lockNode(nodes[0], blocking=False) self.zk.lockNode(nodes[0], blocking=False)
nodepool.launcher.NodeDeleter.delete(
self.zk, pool.getProviderManager('fake-provider'), nodes[0]) pm = pool.getProviderManager('fake-provider')
node_deleter = pm.startNodeCleanup(nodes[0])
node_deleter.join()
# Make sure our old node is in delete state, even though delete failed # Make sure our old node is in delete state, even though delete failed
deleted_node = self.zk.getNode(nodes[0].id) deleted_node = self.zk.getNode(nodes[0].id)