Split DeleteNodeWorker into two threads
After some discussion, it was decided to create a 2nd thread specifically to cleanup our nodes, which could be less agressive then our DeleteNodeWorker interval. This will reduce the pressure we place on clouds looking for leaked nodes. Change-Id: I3f1a482eaa43ea7943cfa5d8b74530cd34d251b3 Signed-off-by: Paul Belanger <pabelanger@redhat.com>
This commit is contained in:
parent
3f8c35397f
commit
7d2c51f164
|
@ -1041,6 +1041,27 @@ class BaseCleanupWorker(threading.Thread):
|
|||
self._interval = interval
|
||||
self._running = False
|
||||
|
||||
def _deleteInstance(self, node):
|
||||
'''
|
||||
Delete an instance from a provider.
|
||||
|
||||
A thread will be spawned to delete the actual instance from the
|
||||
provider.
|
||||
|
||||
:param Node node: A Node object representing the instance to delete.
|
||||
'''
|
||||
self.log.info("Deleting %s instance %s from %s",
|
||||
node.state, node.external_id, node.provider)
|
||||
try:
|
||||
t = InstanceDeleter(
|
||||
self._nodepool.getZK(),
|
||||
self._nodepool.getProviderManager(node.provider),
|
||||
node)
|
||||
t.start()
|
||||
except Exception:
|
||||
self.log.exception("Could not delete instance %s on provider %s",
|
||||
node.external_id, node.provider)
|
||||
|
||||
def run(self):
|
||||
self.log.info("Starting")
|
||||
self._running = True
|
||||
|
@ -1062,11 +1083,11 @@ class BaseCleanupWorker(threading.Thread):
|
|||
self.join()
|
||||
|
||||
|
||||
class DeletedNodeWorker(BaseCleanupWorker):
|
||||
class CleanupWorker(BaseCleanupWorker):
|
||||
def __init__(self, nodepool, interval):
|
||||
super(DeletedNodeWorker, self).__init__(
|
||||
nodepool, interval, name='DeletedNodeWorker')
|
||||
self.log = logging.getLogger("nodepool.DeletedNodeWorker")
|
||||
super(CleanupWorker, self).__init__(
|
||||
nodepool, interval, name='CleanupWorker')
|
||||
self.log = logging.getLogger("nodepool.CleanupWorker")
|
||||
|
||||
def _resetLostRequest(self, zk_conn, req):
|
||||
'''
|
||||
|
@ -1141,26 +1162,59 @@ class DeletedNodeWorker(BaseCleanupWorker):
|
|||
if (now - lock.stat.mtime/1000) > LOCK_CLEANUP:
|
||||
zk.deleteNodeRequestLock(lock.id)
|
||||
|
||||
def _deleteInstance(self, node):
|
||||
def _cleanupLeakedInstances(self):
|
||||
'''
|
||||
Delete an instance from a provider.
|
||||
Delete any leaked server instances.
|
||||
|
||||
A thread will be spawned to delete the actual instance from the
|
||||
provider.
|
||||
|
||||
:param Node node: A Node object representing the instance to delete.
|
||||
Remove any servers we find in providers we know about that are not
|
||||
recorded in the ZooKeeper data.
|
||||
'''
|
||||
self.log.info("Deleting %s instance %s from %s",
|
||||
node.state, node.external_id, node.provider)
|
||||
zk_conn = self._nodepool.getZK()
|
||||
|
||||
for provider in self._nodepool.config.providers.values():
|
||||
manager = self._nodepool.getProviderManager(provider.name)
|
||||
|
||||
for server in manager.listServers():
|
||||
meta = server.get('metadata', {})
|
||||
|
||||
if 'nodepool_provider_name' not in meta:
|
||||
continue
|
||||
|
||||
if meta['nodepool_provider_name'] != provider.name:
|
||||
# Another launcher, sharing this provider but configured
|
||||
# with a different name, owns this.
|
||||
continue
|
||||
|
||||
if not zk_conn.getNode(meta['nodepool_node_id']):
|
||||
self.log.warning(
|
||||
"Deleting leaked instance %s (%s) in %s "
|
||||
"(unknown node id %s)",
|
||||
server.name, server.id, provider.name,
|
||||
meta['nodepool_node_id']
|
||||
)
|
||||
# Create an artifical node to use for deleting the server.
|
||||
node = zk.Node()
|
||||
node.external_id = server.id
|
||||
node.provider = provider.name
|
||||
self._deleteInstance(node)
|
||||
|
||||
if provider.clean_floating_ips:
|
||||
manager.cleanupLeakedFloaters()
|
||||
|
||||
def _run(self):
|
||||
try:
|
||||
t = InstanceDeleter(
|
||||
self._nodepool.getZK(),
|
||||
self._nodepool.getProviderManager(node.provider),
|
||||
node)
|
||||
t.start()
|
||||
self._cleanupNodeRequestLocks()
|
||||
self._cleanupLeakedInstances()
|
||||
self._cleanupLostRequests()
|
||||
except Exception:
|
||||
self.log.exception("Could not delete instance %s on provider %s",
|
||||
node.external_id, node.provider)
|
||||
self.log.exception("Exception in DeletedNodeWorker:")
|
||||
|
||||
|
||||
class DeletedNodeWorker(BaseCleanupWorker):
|
||||
def __init__(self, nodepool, interval):
|
||||
super(DeletedNodeWorker, self).__init__(
|
||||
nodepool, interval, name='DeletedNodeWorker')
|
||||
self.log = logging.getLogger("nodepool.DeletedNodeWorker")
|
||||
|
||||
def _cleanupNodes(self):
|
||||
'''
|
||||
|
@ -1212,51 +1266,9 @@ class DeletedNodeWorker(BaseCleanupWorker):
|
|||
# node from ZooKeeper if it succeeds.
|
||||
self._deleteInstance(node)
|
||||
|
||||
def _cleanupLeakedInstances(self):
|
||||
'''
|
||||
Delete any leaked server instances.
|
||||
|
||||
Remove any servers we find in providers we know about that are not
|
||||
recorded in the ZooKeeper data.
|
||||
'''
|
||||
zk_conn = self._nodepool.getZK()
|
||||
|
||||
for provider in self._nodepool.config.providers.values():
|
||||
manager = self._nodepool.getProviderManager(provider.name)
|
||||
|
||||
for server in manager.listServers():
|
||||
meta = server.get('metadata', {})
|
||||
|
||||
if 'nodepool_provider_name' not in meta:
|
||||
continue
|
||||
|
||||
if meta['nodepool_provider_name'] != provider.name:
|
||||
# Another launcher, sharing this provider but configured
|
||||
# with a different name, owns this.
|
||||
continue
|
||||
|
||||
if not zk_conn.getNode(meta['nodepool_node_id']):
|
||||
self.log.warning(
|
||||
"Deleting leaked instance %s (%s) in %s "
|
||||
"(unknown node id %s)",
|
||||
server.name, server.id, provider.name,
|
||||
meta['nodepool_node_id']
|
||||
)
|
||||
# Create an artifical node to use for deleting the server.
|
||||
node = zk.Node()
|
||||
node.external_id = server.id
|
||||
node.provider = provider.name
|
||||
self._deleteInstance(node)
|
||||
|
||||
if provider.clean_floating_ips:
|
||||
manager.cleanupLeakedFloaters()
|
||||
|
||||
def _run(self):
|
||||
try:
|
||||
self._cleanupNodeRequestLocks()
|
||||
self._cleanupNodes()
|
||||
self._cleanupLeakedInstances()
|
||||
self._cleanupLostRequests()
|
||||
except Exception:
|
||||
self.log.exception("Exception in DeletedNodeWorker:")
|
||||
|
||||
|
@ -1270,12 +1282,14 @@ class NodePool(threading.Thread):
|
|||
self.securefile = securefile
|
||||
self.configfile = configfile
|
||||
self.watermark_sleep = watermark_sleep
|
||||
self.cleanup_interval = 60
|
||||
self.delete_interval = 5
|
||||
self._stopped = False
|
||||
self.config = None
|
||||
self.zk = None
|
||||
self.statsd = stats.get_client()
|
||||
self._provider_threads = {}
|
||||
self._cleanup_thread = None
|
||||
self._delete_thread = None
|
||||
self._wake_condition = threading.Condition()
|
||||
self._submittedRequests = {}
|
||||
|
@ -1288,6 +1302,10 @@ class NodePool(threading.Thread):
|
|||
if self.config:
|
||||
provider_manager.ProviderManager.stopProviders(self.config)
|
||||
|
||||
if self._cleanup_thread:
|
||||
self._cleanup_thread.stop()
|
||||
self._cleanup_thread.join()
|
||||
|
||||
if self._delete_thread:
|
||||
self._delete_thread.stop()
|
||||
self._delete_thread.join()
|
||||
|
@ -1467,6 +1485,11 @@ class NodePool(threading.Thread):
|
|||
|
||||
self.createMinReady()
|
||||
|
||||
if not self._cleanup_thread:
|
||||
self._cleanup_thread = CleanupWorker(
|
||||
self, self.cleanup_interval)
|
||||
self._cleanup_thread.start()
|
||||
|
||||
if not self._delete_thread:
|
||||
self._delete_thread = DeletedNodeWorker(
|
||||
self, self.delete_interval)
|
||||
|
|
|
@ -164,6 +164,7 @@ class BaseTestCase(testtools.TestCase):
|
|||
'fake-provider1',
|
||||
'fake-provider2',
|
||||
'fake-provider3',
|
||||
'CleanupWorker',
|
||||
'DeletedNodeWorker',
|
||||
]
|
||||
|
||||
|
@ -397,6 +398,7 @@ class DBTestCase(BaseTestCase):
|
|||
def useNodepool(self, *args, **kwargs):
|
||||
args = (self.secure_conf,) + args
|
||||
pool = nodepool.NodePool(*args, **kwargs)
|
||||
pool.cleanup_interval = .5
|
||||
pool.delete_interval = .5
|
||||
self.addCleanup(pool.stop)
|
||||
return pool
|
||||
|
|
Loading…
Reference in New Issue