Merge "Offload waiting for server creation/deletion"

This commit is contained in:
Zuul 2021-03-06 06:09:49 +00:00 committed by Gerrit Code Review
commit 74d299ec01
3 changed files with 113 additions and 10 deletions

View File

@ -282,7 +282,7 @@ class FakeOpenStackCloud(object):
server = self._clean_floating_ip(server)
return server
def list_servers(self):
def list_servers(self, bare=False):
return self._server_list
def delete_server(self, name_or_id, delete_ips=True):

View File

@ -17,14 +17,15 @@
import logging
import operator
import os
import threading
import time
import openstack
from openstack.exceptions import ResourceTimeout
from nodepool import exceptions
from nodepool.driver import Provider
from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.nodeutils import iterate_timeout
from nodepool import stats
from nodepool import version
from nodepool import zk
@ -50,16 +51,26 @@ class OpenStackProvider(Provider, QuotaSupport):
self._down_ports = set()
self._last_port_cleanup = None
self._statsd = stats.get_client()
self.running = False
self._server_list_watcher = threading.Thread(
name='ServerListWatcher', target=self._watchServerList,
daemon=True)
self._server_list_watcher_stop_event = threading.Event()
self._cleanup_queue = {}
self._startup_queue = {}
def start(self, zk_conn):
self.resetClient()
self._zk = zk_conn
self.running = True
self._server_list_watcher.start()
def stop(self):
pass
self.running = False
self._server_list_watcher_stop_event.set()
def join(self):
pass
self._server_list_watcher.join()
def getRequestHandler(self, poolworker, request):
return handler.OpenStackNodeRequestHandler(poolworker, request)
@ -303,17 +314,39 @@ class OpenStackProvider(Provider, QuotaSupport):
return None
def waitForServer(self, server, timeout=3600, auto_ip=True):
# This method is called from a separate thread per server. In order to
# reduce thread contention we don't call wait_for_server right now
# but put this thread on sleep until the desired instance is either
# in ACTIVE or ERROR state. After that just continue with
# wait_for_server which will continue its magic.
# TODO: log annotation
self.log.debug('Wait for central server creation %s', server.id)
event = threading.Event()
start_time = time.monotonic()
self._startup_queue[server.id] = (event, start_time + timeout)
if not event.wait(timeout=timeout):
# On timeout emit the same exception as wait_for_server would to
timeout_message = "Timeout waiting for the server to come up."
raise ResourceTimeout(timeout_message)
# TODO: log annotation
self.log.debug('Finished wait for central server creation %s',
server.id)
# Re-calculate timeout to account for the duration so far
elapsed = time.monotonic() - start_time
timeout = max(0, timeout - elapsed)
return self._client.wait_for_server(
server=server, auto_ip=auto_ip,
reuse=False, timeout=timeout)
def waitForNodeCleanup(self, server_id, timeout=600):
for count in iterate_timeout(
timeout, exceptions.ServerDeleteException,
"server %s deletion" % server_id):
server = self.getServer(server_id)
if not server or server.status == "DELETED":
return
event = threading.Event()
self._cleanup_queue[server_id] = (event, time.monotonic() + timeout)
if not event.wait(timeout=timeout):
raise exceptions.ServerDeleteException(
"server %s deletion" % server_id)
def createImage(self, server, image_name, meta):
return self._client.create_image_snapshot(
@ -546,3 +579,71 @@ class OpenStackProvider(Provider, QuotaSupport):
# ability to turn off random portions of the OpenStack API.
self.__azs = [None]
return self.__azs
def _watchServerList(self):
log = logging.getLogger(
"nodepool.driver.openstack.OpenStackProvider.watcher")
while self.running:
if self._server_list_watcher_stop_event.wait(5):
# We're stopping now so don't wait with any thread for node
# deletion.
for event, _ in self._cleanup_queue.values():
event.set()
for event, _ in self._startup_queue.values():
event.set()
break
if not self._cleanup_queue and not self._startup_queue:
# No server deletion to wait for so check can be skipped
continue
try:
log.debug('Get server list')
start = time.monotonic()
# List bare to avoid neutron calls
servers = self._client.list_servers(bare=True)
log.debug('Got server list in %.3fs', time.monotonic() - start)
except Exception:
log.exception('Failed to get server list')
continue
def process_timeouts(queue):
for server_id in list(queue.keys()):
# Remove entries that are beyond timeout
_, timeout = queue[server_id]
if time.monotonic() > timeout:
del queue[server_id]
# Process cleanup queue
existing_server_ids = {
server.id for server in servers
if server.status != 'DELETED'
}
for server_id in list(self._cleanup_queue.keys()):
# Notify waiting threads that don't have server ids
if server_id not in existing_server_ids:
# Notify the thread which is waiting for the delete
log.debug('Waking up cleanup thread for server %s',
server_id)
self._cleanup_queue[server_id][0].set()
del self._cleanup_queue[server_id]
# Process startup queue
finished_server_ids = {
server.id for server in servers
if server.status in ('ACTIVE', 'ERROR')
}
for server_id in list(self._startup_queue.keys()):
# Notify waiting threads that don't have server ids
if server_id in finished_server_ids:
# Notify the thread which is waiting for the delete
log.debug('Waking up startup thread for server %s',
server_id)
self._startup_queue[server_id][0].set()
del self._startup_queue[server_id]
# Process timeouts
process_timeouts(self._cleanup_queue)
process_timeouts(self._startup_queue)
log.debug('Done')

View File

@ -172,6 +172,7 @@ class BaseTestCase(testtools.TestCase):
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.Timeout(test_timeout + 20, gentle=False))
if os.environ.get('OS_STDOUT_CAPTURE') in TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
@ -240,6 +241,7 @@ class BaseTestCase(testtools.TestCase):
'fake-provider3',
'CleanupWorker',
'DeletedNodeWorker',
'ServerListWatcher',
'StatsWorker',
'pydevd.CommandThread',
'pydevd.Reader',