Offload waiting for server creation/deletion
Currently nodepool has one thread per server creation or deletion. Each of those waits for the cloud by regularly getting the server list and checking if their instance is active or gone. On a busy nodepool this leads to severe thread contention when the server list gets large and/or there are many parallel creations/deletions in progress. This can be improved by offloading the waiting to a single thread that regularly retrieves the server list and compares that to the list of waiting server creates/deletes. The calling threads are then waiting until the central thread wakes them up to proceed their task. The waiting threads are waiting for the event outside of the GIL and thus are not contributing to the thread contention problem anymore. An alternative approach would be to redesign the threading to be less threaded but this would be a much more complex redesign. Thus this change keeps the many threads approach but makes them wait much more lightweight which shows a substantial improvement during load testing in a test environment. Change-Id: I5525f2558a4f08a455f72e6b5479f27684471dc7
This commit is contained in:
parent
d642e14cc6
commit
2e59f7b0b3
|
@ -282,7 +282,7 @@ class FakeOpenStackCloud(object):
|
|||
server = self._clean_floating_ip(server)
|
||||
return server
|
||||
|
||||
def list_servers(self):
|
||||
def list_servers(self, bare=False):
|
||||
return self._server_list
|
||||
|
||||
def delete_server(self, name_or_id, delete_ips=True):
|
||||
|
|
|
@ -17,14 +17,15 @@
|
|||
import logging
|
||||
import operator
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
import openstack
|
||||
from openstack.exceptions import ResourceTimeout
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool.driver import Provider
|
||||
from nodepool.driver.utils import QuotaInformation, QuotaSupport
|
||||
from nodepool.nodeutils import iterate_timeout
|
||||
from nodepool import stats
|
||||
from nodepool import version
|
||||
from nodepool import zk
|
||||
|
@ -50,16 +51,26 @@ class OpenStackProvider(Provider, QuotaSupport):
|
|||
self._down_ports = set()
|
||||
self._last_port_cleanup = None
|
||||
self._statsd = stats.get_client()
|
||||
self.running = False
|
||||
self._server_list_watcher = threading.Thread(
|
||||
name='ServerListWatcher', target=self._watchServerList,
|
||||
daemon=True)
|
||||
self._server_list_watcher_stop_event = threading.Event()
|
||||
self._cleanup_queue = {}
|
||||
self._startup_queue = {}
|
||||
|
||||
def start(self, zk_conn):
|
||||
self.resetClient()
|
||||
self._zk = zk_conn
|
||||
self.running = True
|
||||
self._server_list_watcher.start()
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
self.running = False
|
||||
self._server_list_watcher_stop_event.set()
|
||||
|
||||
def join(self):
|
||||
pass
|
||||
self._server_list_watcher.join()
|
||||
|
||||
def getRequestHandler(self, poolworker, request):
|
||||
return handler.OpenStackNodeRequestHandler(poolworker, request)
|
||||
|
@ -303,17 +314,39 @@ class OpenStackProvider(Provider, QuotaSupport):
|
|||
return None
|
||||
|
||||
def waitForServer(self, server, timeout=3600, auto_ip=True):
|
||||
# This method is called from a separate thread per server. In order to
|
||||
# reduce thread contention we don't call wait_for_server right now
|
||||
# but put this thread on sleep until the desired instance is either
|
||||
# in ACTIVE or ERROR state. After that just continue with
|
||||
# wait_for_server which will continue its magic.
|
||||
# TODO: log annotation
|
||||
self.log.debug('Wait for central server creation %s', server.id)
|
||||
event = threading.Event()
|
||||
start_time = time.monotonic()
|
||||
self._startup_queue[server.id] = (event, start_time + timeout)
|
||||
if not event.wait(timeout=timeout):
|
||||
# On timeout emit the same exception as wait_for_server would to
|
||||
timeout_message = "Timeout waiting for the server to come up."
|
||||
raise ResourceTimeout(timeout_message)
|
||||
|
||||
# TODO: log annotation
|
||||
self.log.debug('Finished wait for central server creation %s',
|
||||
server.id)
|
||||
|
||||
# Re-calculate timeout to account for the duration so far
|
||||
elapsed = time.monotonic() - start_time
|
||||
timeout = max(0, timeout - elapsed)
|
||||
|
||||
return self._client.wait_for_server(
|
||||
server=server, auto_ip=auto_ip,
|
||||
reuse=False, timeout=timeout)
|
||||
|
||||
def waitForNodeCleanup(self, server_id, timeout=600):
|
||||
for count in iterate_timeout(
|
||||
timeout, exceptions.ServerDeleteException,
|
||||
"server %s deletion" % server_id):
|
||||
server = self.getServer(server_id)
|
||||
if not server or server.status == "DELETED":
|
||||
return
|
||||
event = threading.Event()
|
||||
self._cleanup_queue[server_id] = (event, time.monotonic() + timeout)
|
||||
if not event.wait(timeout=timeout):
|
||||
raise exceptions.ServerDeleteException(
|
||||
"server %s deletion" % server_id)
|
||||
|
||||
def createImage(self, server, image_name, meta):
|
||||
return self._client.create_image_snapshot(
|
||||
|
@ -546,3 +579,71 @@ class OpenStackProvider(Provider, QuotaSupport):
|
|||
# ability to turn off random portions of the OpenStack API.
|
||||
self.__azs = [None]
|
||||
return self.__azs
|
||||
|
||||
def _watchServerList(self):
|
||||
log = logging.getLogger(
|
||||
"nodepool.driver.openstack.OpenStackProvider.watcher")
|
||||
while self.running:
|
||||
if self._server_list_watcher_stop_event.wait(5):
|
||||
# We're stopping now so don't wait with any thread for node
|
||||
# deletion.
|
||||
for event, _ in self._cleanup_queue.values():
|
||||
event.set()
|
||||
for event, _ in self._startup_queue.values():
|
||||
event.set()
|
||||
break
|
||||
|
||||
if not self._cleanup_queue and not self._startup_queue:
|
||||
# No server deletion to wait for so check can be skipped
|
||||
continue
|
||||
|
||||
try:
|
||||
log.debug('Get server list')
|
||||
start = time.monotonic()
|
||||
# List bare to avoid neutron calls
|
||||
servers = self._client.list_servers(bare=True)
|
||||
log.debug('Got server list in %.3fs', time.monotonic() - start)
|
||||
except Exception:
|
||||
log.exception('Failed to get server list')
|
||||
continue
|
||||
|
||||
def process_timeouts(queue):
|
||||
for server_id in list(queue.keys()):
|
||||
# Remove entries that are beyond timeout
|
||||
_, timeout = queue[server_id]
|
||||
if time.monotonic() > timeout:
|
||||
del queue[server_id]
|
||||
|
||||
# Process cleanup queue
|
||||
existing_server_ids = {
|
||||
server.id for server in servers
|
||||
if server.status != 'DELETED'
|
||||
}
|
||||
for server_id in list(self._cleanup_queue.keys()):
|
||||
# Notify waiting threads that don't have server ids
|
||||
if server_id not in existing_server_ids:
|
||||
# Notify the thread which is waiting for the delete
|
||||
log.debug('Waking up cleanup thread for server %s',
|
||||
server_id)
|
||||
self._cleanup_queue[server_id][0].set()
|
||||
del self._cleanup_queue[server_id]
|
||||
|
||||
# Process startup queue
|
||||
finished_server_ids = {
|
||||
server.id for server in servers
|
||||
if server.status in ('ACTIVE', 'ERROR')
|
||||
}
|
||||
for server_id in list(self._startup_queue.keys()):
|
||||
# Notify waiting threads that don't have server ids
|
||||
if server_id in finished_server_ids:
|
||||
# Notify the thread which is waiting for the delete
|
||||
log.debug('Waking up startup thread for server %s',
|
||||
server_id)
|
||||
self._startup_queue[server_id][0].set()
|
||||
del self._startup_queue[server_id]
|
||||
|
||||
# Process timeouts
|
||||
process_timeouts(self._cleanup_queue)
|
||||
process_timeouts(self._startup_queue)
|
||||
|
||||
log.debug('Done')
|
||||
|
|
|
@ -153,6 +153,7 @@ class BaseTestCase(testtools.TestCase):
|
|||
test_timeout = 0
|
||||
if test_timeout > 0:
|
||||
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
|
||||
self.useFixture(fixtures.Timeout(test_timeout + 20, gentle=False))
|
||||
|
||||
if os.environ.get('OS_STDOUT_CAPTURE') in TRUE_VALUES:
|
||||
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
|
||||
|
@ -221,6 +222,7 @@ class BaseTestCase(testtools.TestCase):
|
|||
'fake-provider3',
|
||||
'CleanupWorker',
|
||||
'DeletedNodeWorker',
|
||||
'ServerListWatcher',
|
||||
'StatsWorker',
|
||||
'pydevd.CommandThread',
|
||||
'pydevd.Reader',
|
||||
|
|
Loading…
Reference in New Issue