Slow processing of failed nodescan connections
The nodescan processing loop iterates over the state machine for every pending request and attempts to advance it every time the loop runs. The loop runs either every second or every time a socket is ready for io. If a connection is failing quickly, we attempt to avoid a busy-wait by removing its socket and resetting the state machine to START so that it will only open a new connection after the 1 second timeout. But if there are a lot of connections (or, perhaps even just more than one that is failing) then we will go through the loop quickly anyway, and we may effectively busy-wait. To avoid this, we will process all connections with a ready socket any time they are ready. But we will process the remaining connections only once every second. An exception is made for the case where we can move a request from the pending queue to the activ queue so we don't unecessarily delay the start of processing. Additionally, if a connection fails during the first key exchange, reset to START since it's still safe to do so, and we will get the benefit of the extra delay. Any connection failures after the first key exchange will still re-connect immediately. Change-Id: Ic3f5b223fafd805d2357171e7822e0e33bd104bb
This commit is contained in:
@ -125,7 +125,7 @@
|
||||
abstract: true
|
||||
connection: aws
|
||||
host-key-checking: true
|
||||
boot-timeout: 1
|
||||
boot-timeout: 2
|
||||
launch-timeout: 600
|
||||
launch-attempts: 2
|
||||
object-storage:
|
||||
|
@ -456,13 +456,11 @@ class NodescanRequest:
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
f"SSH error connecting to {self.ip} on port {self.port}")
|
||||
# Try again
|
||||
# Try again; go to start to avoid busy-waiting.
|
||||
self._close()
|
||||
self.key_negotiation_failures += 1
|
||||
self.state = self.START
|
||||
self._checkTimeout()
|
||||
self._connect()
|
||||
self.state = self.CONNECTING_INIT
|
||||
return
|
||||
# This is our first successful connection. Now that
|
||||
# we've done it, start again specifying the first key
|
||||
@ -625,6 +623,11 @@ class NodescanWorker:
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
# To avoid busy waiting, we will process any request with a
|
||||
# ready socket, but only process the remaining requests
|
||||
# periodically. This value is the last time we processed
|
||||
# those unready requests.
|
||||
last_unready_check = 0
|
||||
while self._running:
|
||||
# Set the poll timeout to 1 second so that we check all
|
||||
# requests for timeouts every second. This could be
|
||||
@ -638,6 +641,7 @@ class NodescanWorker:
|
||||
request = self._pending_requests.pop(0)
|
||||
self._active_requests.append(request)
|
||||
timeout = 0
|
||||
last_unready_check = 0
|
||||
ready = self.poll.poll(timeout=timeout)
|
||||
ready = [x[0] for x in ready]
|
||||
if self.wake_read in ready:
|
||||
@ -647,15 +651,20 @@ class NodescanWorker:
|
||||
os.read(self.wake_read, 1024)
|
||||
except BlockingIOError:
|
||||
break
|
||||
process_unready = time.monotonic() - last_unready_check > 1.0
|
||||
for request in self._active_requests:
|
||||
try:
|
||||
socket_ready = (request.sock and
|
||||
request.sock.fileno() in ready)
|
||||
request.advance(socket_ready)
|
||||
event_ready = request.event and request.event.is_set()
|
||||
if process_unready or socket_ready or event_ready:
|
||||
request.advance(socket_ready)
|
||||
except Exception as e:
|
||||
request.fail(e)
|
||||
if request.complete:
|
||||
self.removeRequest(request)
|
||||
if process_unready:
|
||||
last_unready_check = time.monotonic()
|
||||
|
||||
|
||||
class Launcher:
|
||||
|
Reference in New Issue
Block a user