Support threadless deletes

The launcher implements deletes using threads, and unlike with
launches, does not give drivers an opportunity to override that
and handle them without threads (as we want to do in the state
machine driver).

To correct this, we move the NodeDeleter class from the launcher
to driver utils, and add a new driver Provider method that returns
the NodeDeleter thread.  This is added in the base Provider class
so all drivers get this behavior by default.

In the state machine driver, we override the method so that instead
of returning a thread, we start a state machine and add it to a list
of state machines that our internal state machine runner thread
should drive.

Change-Id: Iddb7ed23c741824b5727fe2d89c9ddbfc01cd7d7
This commit is contained in:
James E. Blair 2021-03-09 17:20:03 -08:00
parent 3d725e4386
commit 63f38dfd6c
15 changed files with 252 additions and 106 deletions

View File

@ -261,7 +261,8 @@ class NodePoolCmd(NodepoolApp):
provider = self.pool.config.providers[node.provider]
manager = provider_manager.get_provider(provider)
manager.start(self.zk)
launcher.NodeDeleter.delete(self.zk, manager, node)
node_deleter = manager.startNodeCleanup(node)
node_deleter.join()
manager.stop()
else:
node.state = zk.DELETING

View File

@ -221,6 +221,19 @@ class Provider(ProviderNotifications):
"""
pass
def startNodeCleanup(self, node):
'''Starts a background process to delete a node
This should return a NodeDeleter to implement and track the
deletion of a node.
:param Node node: A locked Node object representing the
instance to delete.
:returns: A NodeDeleter instance.
'''
pass
@abc.abstractmethod
def cleanupNode(self, node_id):
"""Cleanup a node after use
@ -797,7 +810,7 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
def launchesComplete(self):
'''
Handler needs to implement this to check if all nodes in self.nodeset
have completed the launch sequence..
have completed the launch sequence.
This method will be called periodically to check on launch progress.

View File

@ -18,6 +18,7 @@ import botocore.exceptions
import nodepool.exceptions
from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.aws.handler import AwsNodeRequestHandler
@ -53,6 +54,7 @@ class AwsProvider(Provider):
return AwsNodeRequestHandler(poolworker, request)
def start(self, zk_conn):
self._zk = zk_conn
if self.ec2 is not None:
return True
self.log.debug("Starting")
@ -152,6 +154,11 @@ class AwsProvider(Provider):
# TODO: remove leaked resources if any
pass
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
if self.ec2 is None:
return False

View File

@ -16,6 +16,7 @@ import logging
import json
from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.azure import handler
from nodepool import zk
@ -196,6 +197,11 @@ class AzureProvider(Provider):
node.state = zk.DELETING
self._zk.storeNode(node)
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
self.log.debug('Server ID: %s' % server_id)
try:

View File

@ -25,6 +25,7 @@ from nodepool import exceptions
from nodepool.driver import Provider
from nodepool.driver.kubernetes import handler
from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
urllib3.disable_warnings()
@ -121,6 +122,11 @@ class KubernetesProvider(Provider, QuotaSupport):
def cleanupLeakedResources(self):
pass
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
if not self.ready:
return

View File

@ -23,6 +23,7 @@ from openshift.dynamic import DynamicClient as os_client
from nodepool import exceptions
from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.openshift import handler
urllib3.disable_warnings()
@ -54,6 +55,7 @@ class OpenshiftProvider(Provider):
def start(self, zk_conn):
self.log.debug("Starting")
self._zk = zk_conn
if self.ready or not self.os_client or not self.k8s_client:
return
self.ready = True
@ -103,6 +105,11 @@ class OpenshiftProvider(Provider):
def cleanupLeakedResources(self):
pass
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
if not self.ready:
return

View File

@ -20,6 +20,7 @@ import time
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.openshift.provider import OpenshiftProvider
from nodepool.driver.openshiftpods import handler
@ -61,6 +62,7 @@ class OpenshiftPodsProvider(OpenshiftProvider):
def start(self, zk_conn):
self.log.debug("Starting")
self._zk = zk_conn
if self.ready or not self.k8s_client:
return
self.ready = True
@ -103,6 +105,11 @@ class OpenshiftPodsProvider(OpenshiftProvider):
return pool, pod_name
return None, None
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
if not self.ready:
return

View File

@ -26,6 +26,7 @@ from openstack.exceptions import ResourceTimeout
from nodepool import exceptions
from nodepool.driver import Provider
from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
from nodepool import stats
from nodepool import version
from nodepool import zk
@ -437,6 +438,11 @@ class OpenStackProvider(Provider, QuotaSupport):
def deleteServer(self, server_id):
return self._client.delete_server(server_id, delete_ips=True)
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
server = self.getServer(server_id)
if not server:

View File

@ -19,6 +19,7 @@ import math
from nodepool.driver.taskmanager import BaseTaskManagerProvider, Task
from nodepool.driver import Driver, NodeRequestHandler
from nodepool.driver.utils import NodeLauncher, QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
from nodepool.nodeutils import iterate_timeout, nodescan
from nodepool import exceptions
from nodepool import zk
@ -355,6 +356,11 @@ class SimpleTaskManagerProvider(BaseTaskManagerProvider, QuotaSupport):
return used_quota
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, external_id):
instance = self.getInstance(external_id)
if (not instance) or instance.deleted:

View File

@ -22,7 +22,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from nodepool.driver import Driver, NodeRequestHandler, Provider
from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.nodeutils import iterate_timeout, nodescan
from nodepool.nodeutils import nodescan
from nodepool.logconfig import get_annotated_logger
from nodepool import stats
from nodepool import exceptions
@ -204,6 +204,86 @@ class StateMachineNodeLauncher(stats.StatsReporter):
return True
class StateMachineNodeDeleter:
"""The state of the state machine.
This driver collects state machines from the underlying cloud
adapter implementations; those state machines handle building the
node. But we still have our own accounting and a little bit of
work to do on either side of that process, so this data structure
holds the extra information we need for that.
"""
DELETE_TIMEOUT = 600
def __init__(self, zk, provider_manager, node):
# Based on utils.NodeDeleter
self.log = logging.getLogger("nodepool.StateMachineNodeDeleter")
self.manager = provider_manager
self.zk = zk
# Note: the node is locked
self.node = node
# Local additions:
self.start_time = time.monotonic()
self.state_machine = self.manager.adapter.getDeleteStateMachine(
node.external_id)
@property
def complete(self):
return self.state_machine.complete
def runStateMachine(self):
state_machine = self.state_machine
node = self.node
node_exists = (node.id is not None)
if self.state_machine.complete:
return True
try:
now = time.monotonic()
if now - state_machine.start_time > self.DELETE_TIMEOUT:
raise Exception("Timeout waiting for instance deletion")
if state_machine.state == state_machine.START:
node.state = zk.DELETING
self.zk.storeNode(node)
if node.external_id:
state_machine.advance()
self.log.debug(f"State machine for {node.id} at "
"{state_machine.state}")
if not self.state_machine.complete:
return
except exceptions.NotFound:
self.log.info(f"Instance {node.external_id} not found in "
"provider {node.provider}")
except Exception:
self.log.exception("Exception deleting instance "
f"{node.external_id} from {node.provider}:")
# Don't delete the ZK node in this case, but do unlock it
if node_exists:
self.zk.unlockNode(node)
self.state_machine.complete = True
return
if node_exists:
self.log.info(
"Deleting ZK node id=%s, state=%s, external_id=%s",
node.id, node.state, node.external_id)
# This also effectively releases the lock
self.zk.deleteNode(node)
self.manager.nodeDeletedNotification(node)
return True
def join(self):
# This is used by the CLI for synchronous deletes
while self in self.manager.deleters:
time.sleep(0)
class StateMachineHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.simple."
"StateMachineHandler")
@ -325,7 +405,8 @@ class StateMachineProvider(Provider, QuotaSupport):
super().__init__()
self.provider = provider
self.adapter = adapter
self.delete_state_machines = {}
# State machines
self.deleters = []
self.launchers = []
self._zk = None
self.keyscan_worker = None
@ -355,18 +436,23 @@ class StateMachineProvider(Provider, QuotaSupport):
def _runStateMachines(self):
while self.running:
to_remove = []
for launcher in self.launchers:
loop_start = time.monotonic()
for sm in self.deleters + self.launchers:
try:
launcher.runStateMachine()
if launcher.node.state != zk.BUILDING:
sm.runStateMachine()
if sm.complete:
self.log.debug("Removing state machine from runner")
to_remove.append(launcher)
to_remove.append(sm)
except Exception:
self.log.exception("Error running state machine:")
for launcher in to_remove:
self.launchers.remove(launcher)
if self.launchers:
time.sleep(0)
for sm in to_remove:
if sm in self.deleters:
self.deleters.remove(sm)
if sm in self.launchers:
self.launchers.remove(sm)
loop_end = time.monotonic()
if self.launchers or self.deleters:
time.sleep(max(0, 10 - (loop_end - loop_start)))
else:
time.sleep(1)
@ -424,28 +510,18 @@ class StateMachineProvider(Provider, QuotaSupport):
return used_quota
def startNodeCleanup(self, node):
nd = StateMachineNodeDeleter(self._zk, self, node)
self.deleters.append(nd)
return nd
def cleanupNode(self, external_id):
# TODO: This happens in a thread-per-node in the launcher
# (above the driver level). If we want to make this
# single-threaded (yes), we'll need to modify the launcher
# itself.
state_machine = self.adapter.getDeleteStateMachine(
external_id)
self.delete_state_machines[external_id] = state_machine
# This is no longer used due to our custom NodeDeleter
raise NotImplementedError()
def waitForNodeCleanup(self, external_id, timeout=600):
try:
for count in iterate_timeout(
timeout, exceptions.ServerDeleteException,
"server %s deletion" % external_id):
sm = self.delete_state_machines[external_id]
sm.advance()
self.log.debug(f"State machine for {external_id} at "
f"{sm.state}")
if sm.complete:
return
finally:
self.delete_state_machines.pop(external_id, None)
# This is no longer used due to our custom NodeDeleter
raise NotImplementedError()
def cleanupLeakedResources(self):
known_nodes = set()

View File

@ -24,6 +24,7 @@ from nodepool import exceptions
from nodepool import nodeutils
from nodepool import zk
from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.static.handler import StaticNodeRequestHandler
@ -389,6 +390,11 @@ class StaticNodeProvider(Provider):
for n in p.nodes
}
def startNodeCleanup(self, node):
t = NodeDeleter(self.zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
return True

View File

@ -16,6 +16,7 @@
from nodepool.driver import Provider
from nodepool.driver.test import handler
from nodepool.driver.utils import NodeDeleter
class TestProvider(Provider):
@ -23,7 +24,7 @@ class TestProvider(Provider):
self.provider = provider
def start(self, zk_conn):
pass
self._zk = zk_conn
def stop(self):
pass
@ -34,6 +35,11 @@ class TestProvider(Provider):
def labelReady(self, name):
return True
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, node_id):
pass

View File

@ -109,6 +109,73 @@ class NodeLauncher(threading.Thread,
self.log.exception("Exception while reporting stats:")
class NodeDeleter(threading.Thread):
log = logging.getLogger("nodepool.NodeDeleter")
def __init__(self, zk, provider_manager, node):
threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
(node.provider, node.external_id))
self._zk = zk
self._provider_manager = provider_manager
self._node = node
@staticmethod
def delete(zk_conn, manager, node, node_exists=True):
'''
Delete a server instance and ZooKeeper node.
This is a class method so we can support instantaneous deletes.
:param ZooKeeper zk_conn: A ZooKeeper object to use.
:param ProviderManager provider_manager: ProviderManager object to
use fo deleting the server.
:param Node node: A locked Node object that describes the server to
delete.
:param bool node_exists: True if the node actually exists in ZooKeeper.
An artifical Node object can be passed that can be used to delete
a leaked instance.
'''
try:
node.state = zk.DELETING
zk_conn.storeNode(node)
if node.external_id:
manager.cleanupNode(node.external_id)
manager.waitForNodeCleanup(node.external_id)
except exceptions.NotFound:
NodeDeleter.log.info("Instance %s not found in provider %s",
node.external_id, node.provider)
except Exception:
NodeDeleter.log.exception(
"Exception deleting instance %s from %s:",
node.external_id, node.provider)
# Don't delete the ZK node in this case, but do unlock it
if node_exists:
zk_conn.unlockNode(node)
return
if node_exists:
NodeDeleter.log.info(
"Deleting ZK node id=%s, state=%s, external_id=%s",
node.id, node.state, node.external_id)
# This also effectively releases the lock
zk_conn.deleteNode(node)
manager.nodeDeletedNotification(node)
def run(self):
# Since leaked instances won't have an actual node in ZooKeeper,
# we need to check 'id' to see if this is an artificial Node.
if self._node.id is None:
node_exists = False
else:
node_exists = True
try:
self.delete(self._zk, self._provider_manager,
self._node, node_exists)
except Exception:
self.log.exception("Error deleting node %s:", self._node)
class QuotaInformation:
def __init__(self, cores=None, instances=None, ram=None, default=0):

View File

@ -45,73 +45,6 @@ LOCK_CLEANUP = 8 * HOURS
SUSPEND_WAIT_TIME = 30
class NodeDeleter(threading.Thread):
log = logging.getLogger("nodepool.NodeDeleter")
def __init__(self, zk, provider_manager, node):
threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
(node.provider, node.external_id))
self._zk = zk
self._provider_manager = provider_manager
self._node = node
@staticmethod
def delete(zk_conn, manager, node, node_exists=True):
'''
Delete a server instance and ZooKeeper node.
This is a class method so we can support instantaneous deletes.
:param ZooKeeper zk_conn: A ZooKeeper object to use.
:param ProviderManager provider_manager: ProviderManager object to
use fo deleting the server.
:param Node node: A locked Node object that describes the server to
delete.
:param bool node_exists: True if the node actually exists in ZooKeeper.
An artifical Node object can be passed that can be used to delete
a leaked instance.
'''
try:
node.state = zk.DELETING
zk_conn.storeNode(node)
if node.external_id:
manager.cleanupNode(node.external_id)
manager.waitForNodeCleanup(node.external_id)
except exceptions.NotFound:
NodeDeleter.log.info("Instance %s not found in provider %s",
node.external_id, node.provider)
except Exception:
NodeDeleter.log.exception(
"Exception deleting instance %s from %s:",
node.external_id, node.provider)
# Don't delete the ZK node in this case, but do unlock it
if node_exists:
zk_conn.unlockNode(node)
return
if node_exists:
NodeDeleter.log.info(
"Deleting ZK node id=%s, state=%s, external_id=%s",
node.id, node.state, node.external_id)
# This also effectively releases the lock
zk_conn.deleteNode(node)
manager.nodeDeletedNotification(node)
def run(self):
# Since leaked instances won't have an actual node in ZooKeeper,
# we need to check 'id' to see if this is an artificial Node.
if self._node.id is None:
node_exists = False
else:
node_exists = True
try:
self.delete(self._zk, self._provider_manager,
self._node, node_exists)
except Exception:
self.log.exception("Error deleting node %s:", self._node)
class PoolWorker(threading.Thread, stats.StatsReporter):
'''
Class that manages node requests for a single provider pool.
@ -698,11 +631,8 @@ class DeletedNodeWorker(BaseCleanupWorker):
self.log.info("Deleting %s instance %s from %s",
node.state, node.external_id, node.provider)
try:
t = NodeDeleter(
self._nodepool.getZK(),
self._nodepool.getProviderManager(node.provider),
node)
t.start()
pm = self._nodepool.getProviderManager(node.provider)
pm.startNodeCleanup(node)
except Exception:
self.log.exception("Could not delete instance %s on provider %s",
node.external_id, node.provider)

View File

@ -975,8 +975,10 @@ class TestLauncher(tests.DBTestCase):
self.assertEqual(len(nodes), 1)
self.zk.lockNode(nodes[0], blocking=False)
nodepool.launcher.NodeDeleter.delete(
self.zk, pool.getProviderManager('fake-provider'), nodes[0])
pm = pool.getProviderManager('fake-provider')
node_deleter = pm.startNodeCleanup(nodes[0])
node_deleter.join()
# Make sure our old node is in delete state, even though delete failed
deleted_node = self.zk.getNode(nodes[0].id)