From b2e5fd3d35928bd97f4c3f5ed5c9efa6df4789bf Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 30 Jan 2025 14:52:12 -0800 Subject: [PATCH] Integrate nodescan worker into launcher This checks for connectivity and scans for ssh host keys on new nodes. Unlike nodepool, we will store the flag indicating whether we should scan for keys as well as the boot timeout on the node. This is because later in the node creation process, we no longer have easy access to the label with that configuration information (and it may change or no longer be present by the end). Semi-related: There are two locations in the launcher where we create new node objects; one of them was missing some optional initializers related to image information. It is updated so that both are at parity. Change-Id: I8ff7188cc8c8ae8f33ab1e92f3f5b0b91b9bfd41 --- tests/unit/test_cloud_driver.py | 6 ++++ tests/unit/test_launcher.py | 17 ++++++++- tests/unit/test_nodescan.py | 24 +++++++++---- zuul/launcher/server.py | 64 ++++++++++++++++++++++++++++----- zuul/model.py | 33 ++++++++++------- zuul/provider/schema.py | 2 ++ 6 files changed, 118 insertions(+), 28 deletions(-) diff --git a/tests/unit/test_cloud_driver.py b/tests/unit/test_cloud_driver.py index d2a869b17a..453c971690 100644 --- a/tests/unit/test_cloud_driver.py +++ b/tests/unit/test_cloud_driver.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import fixtures import testtools from kazoo.exceptions import NoNodeError @@ -29,6 +30,11 @@ class BaseCloudDriverTest(ZuulTestCase): cloud_test_image_format = '' cloud_test_provider_name = '' + def setUp(self): + self.useFixture(fixtures.MonkeyPatch( + 'zuul.launcher.server.NodescanRequest.FAKE', True)) + super().setUp() + def _getEndpoint(self): # Use the launcher provider so that we're using the same ttl # method caches. diff --git a/tests/unit/test_launcher.py b/tests/unit/test_launcher.py index 960baa3da9..349db353a4 100644 --- a/tests/unit/test_launcher.py +++ b/tests/unit/test_launcher.py @@ -21,6 +21,7 @@ from unittest import mock from zuul import model from zuul.launcher.client import LauncherClient +import fixtures import responses import testtools from kazoo.exceptions import NoNodeError @@ -127,6 +128,8 @@ class LauncherBaseTestCase(ZuulTestCase): self.mock_aws.start() # Must start responses after mock_aws self.useFixture(ImageMocksFixture()) + self.useFixture(fixtures.MonkeyPatch( + 'zuul.launcher.server.NodescanRequest.FAKE', True)) self.s3 = boto3.resource('s3', region_name='us-west-2') self.s3.create_bucket( Bucket='zuul', @@ -424,10 +427,20 @@ class TestLauncher(LauncherBaseTestCase): @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) def test_jobs_executed(self): + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addApproval('Code-Review', 2) self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) self.waitUntilSettled() + + nodes = self.launcher.api.nodes_cache.getItems() + self.assertNotEqual(nodes[0].host_keys, []) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + self.assertEqual(self.getJobFromHistory('check-job').result, 'SUCCESS') self.assertEqual(A.data['status'], 'MERGED') @@ -834,7 +847,9 @@ class TestMinReadyLauncher(LauncherBaseTestCase): if len(in_use_nodes) == 2: break - self.executor_server.hold_jobs_in_build = True + self.assertNotEqual(nodes[0].host_keys, []) + + self.executor_server.hold_jobs_in_build = False self.executor_server.release() self.waitUntilSettled() diff --git a/tests/unit/test_nodescan.py b/tests/unit/test_nodescan.py index 89afa6ed3e..3d1e948805 100644 --- a/tests/unit/test_nodescan.py +++ b/tests/unit/test_nodescan.py @@ -128,9 +128,11 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=300, ) worker.start() - request = NodescanRequest(node, True, 300, self.log) + request = NodescanRequest(node, self.log) worker.addRequest(request) for _ in iterate_timeout(30, 'waiting for nodescan'): if request.complete: @@ -156,9 +158,11 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=1, ) worker.start() - request = NodescanRequest(node, True, 1, self.log) + request = NodescanRequest(node, self.log) worker.addRequest(request) for _ in iterate_timeout(30, 'waiting for nodescan'): if request.complete: @@ -185,9 +189,11 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=1, ) worker.start() - request = NodescanRequest(node, True, 1, self.log) + request = NodescanRequest(node, self.log) worker.addRequest(request) for _ in iterate_timeout(30, 'waiting for nodescan'): if request.complete: @@ -215,9 +221,11 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=1, ) worker.start() - request = NodescanRequest(node, True, 1, self.log) + request = NodescanRequest(node, self.log) worker.addRequest(request) for _ in iterate_timeout(30, 'waiting for nodescan'): if request.complete: @@ -252,16 +260,20 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=300, ) node2 = DummyProviderNode() node2._set( interface_ip='198.51.100.2', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=300, ) - request1 = NodescanRequest(node1, True, 300, self.log) - request2 = NodescanRequest(node2, True, 300, self.log) + request1 = NodescanRequest(node1, self.log) + request2 = NodescanRequest(node2, self.log) worker.addRequest(request1) worker.addRequest(request2) worker.start() diff --git a/zuul/launcher/server.py b/zuul/launcher/server.py index d54f122910..afeefbba4c 100644 --- a/zuul/launcher/server.py +++ b/zuul/launcher/server.py @@ -1,5 +1,5 @@ # Copyright 2024 BMW Group -# Copyright 2024 Acme Gating, LLC +# Copyright 2024-2025 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -267,11 +267,11 @@ class NodescanRequest: # For unit testing FAKE = False - def __init__(self, node, host_key_checking, timeout, log): + def __init__(self, node, log): self.state = self.START self.node = node - self.host_key_checking = host_key_checking - self.timeout = timeout + self.host_key_checking = node.host_key_checking + self.timeout = node.boot_timeout self.log = log self.complete = False self.keys = [] @@ -605,6 +605,9 @@ class NodescanWorker: except ValueError: pass + def length(self): + return len(self._active_requests) + len(self._pending_requests) + def registerDescriptor(self, fd): """Register the fd with the poll object""" # Oneshot means that once it triggers, it will automatically @@ -734,6 +737,7 @@ class Launcher: self._imageUpdatedCallback ) + self.nodescan_worker = NodescanWorker() self.launcher_thread = threading.Thread( target=self.run, name="Launcher", @@ -940,6 +944,8 @@ class Launcher: label=label.name, label_config_hash=label.config_hash, max_ready_age=label.max_ready_age, + host_key_checking=label.host_key_checking, + boot_timeout=label.boot_timeout, request_id=request.uuid, zuul_event_id=request.zuul_event_id, connection_name=provider.connection_name, @@ -1108,15 +1114,46 @@ class Launcher: if not node.create_state_machine.complete: self.wake_event.set() return - self._updateNodeFromInstance(node, instance) - node.setState(node.State.READY) - self.wake_event.set() - log.debug("Marking node %s as %s", node, node.state) + # Note this method has the side effect of updating + # node info from the instance. + if self._checkNodescanRequest(node, instance): + node.setState(node.State.READY) + self.wake_event.set() + log.debug("Marking node %s as %s", node, node.state) node.releaseLock(ctx) + def _checkNodescanRequest(self, node, instance): + if node.nodescan_request is None: + # We just finished the create state machine, update with + # new info. + self._updateNodeFromInstance(node, instance) + node.nodescan_request = NodescanRequest(node, self.log) + self.nodescan_worker.addRequest(node.nodescan_request) + self.log.debug( + "Submitted nodescan request for %s queue length %s", + node.interface_ip, + self.nodescan_worker.length()) + if not node.nodescan_request.complete: + return False + try: + keys = node.nodescan_request.result() + except Exception as e: + if isinstance(e, exceptions.ConnectionTimeoutException): + self.log.warning("Error scanning keys: %s", str(e)) + else: + self.log.exception("Exception scanning keys:") + raise exceptions.LaunchKeyscanException( + "Can't scan key for %s" % (node,)) + if keys: + node.host_keys = keys + return True + def _cleanupNode(self, node, log): with self.createZKContext(node._lock, self.log) as ctx: with node.activeContext(ctx): + self.nodescan_worker.removeRequest(node.nodescan_request) + node.nodescan_request = None + if not node.delete_state_machine: log.debug("Cleaning up node %s", node) provider = self._getProviderForNode( @@ -1162,6 +1199,7 @@ class Launcher: node_uuid = uuid.uuid4().hex # We don't pass a provider here as the node should not # be directly associated with a tenant or provider. + image = provider.images[label.image] tags = provider.getNodeTags( self.system.system_id, label, node_uuid) node_class = provider.driver.getProviderNodeClass() @@ -1172,12 +1210,17 @@ class Launcher: label=label.name, label_config_hash=label.config_hash, max_ready_age=label.max_ready_age, + host_key_checking=label.host_key_checking, + boot_timeout=label.boot_timeout, request_id=None, connection_name=provider.connection_name, zuul_event_id=uuid.uuid4().hex, tenant_name=None, provider=None, tags=tags, + # Set any node attributes we already know here + connection_port=image.connection_port, + connection_type=image.connection_type, ) self.log.debug("Created min-ready node %s via provider %s", node, provider) @@ -1355,6 +1398,9 @@ class Launcher: self.command_thread.daemon = True self.command_thread.start() + self.log.debug("Starting nodescan worker") + self.nodescan_worker.start() + self.log.debug("Starting launcher thread") self.launcher_thread.start() @@ -1368,6 +1414,7 @@ class Launcher: self.connections.stop() self.upload_executor.shutdown() self.endpoint_upload_executor.shutdown() + self.nodescan_worker.stop() # Endpoints are stopped by drivers self.log.debug("Stopped launcher") @@ -1380,6 +1427,7 @@ class Launcher: self.api.stop() self.zk_client.disconnect() self.tracing.stop() + self.nodescan_worker.join() self.log.debug("Joined launcher") def runCommand(self): diff --git a/zuul/model.py b/zuul/model.py index 729834242a..4e22c78ddb 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -2468,6 +2468,8 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin, connection_name="", create_state={}, delete_state={}, + host_key_checking=None, + boot_timeout=None, # Node data host_id=None, interface_ip=None, @@ -2487,10 +2489,12 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin, resources=None, attributes={}, tenant_name=None, + host_keys=[], # Attributes that are not serialized is_locked=False, create_state_machine=None, delete_state_machine=None, + nodescan_request=None, # Attributes set by the launcher _lscores=None, ) @@ -2552,24 +2556,27 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin, def getNodeData(self): return dict( - host_id=self.host_id, - interface_ip=self.interface_ip, - public_ipv4=self.public_ipv4, - private_ipv4=self.private_ipv4, - public_ipv6=self.public_ipv6, - private_ipv6=self.private_ipv6, + attributes=self.attributes, + az=self.az, + boot_timeout=self.boot_timeout, + cloud=self.cloud, connection_port=self.connection_port, connection_type=self.connection_type, - slot=self.slot, - az=self.az, - cloud=self.cloud, - provider=self.provider, - region=self.region, - username=self.username, hold_expiration=self.hold_expiration, + host_id=self.host_id, + host_key_checking=self.host_key_checking, + host_keys=self.host_keys, + interface_ip=self.interface_ip, + private_ipv4=self.private_ipv4, + private_ipv6=self.private_ipv6, + provider=self.provider, + public_ipv4=self.public_ipv4, + public_ipv6=self.public_ipv6, + region=self.region, resources=self.resources, - attributes=self.attributes, + slot=self.slot, tenant_name=self.tenant_name, + username=self.username, ) diff --git a/zuul/provider/schema.py b/zuul/provider/schema.py index 139c858dd2..43bb280fad 100644 --- a/zuul/provider/schema.py +++ b/zuul/provider/schema.py @@ -34,11 +34,13 @@ base_label = vs.Schema({ Optional('tags', default=dict): {str: str}, Optional('min_ready', default=0): int, Optional('max_ready_age', default=0): int, + Optional('boot-timeout', default=300): int, }) # Label attributes that are common to any kind of ssh-based driver. ssh_label = vs.Schema({ Optional('key-name'): Nullable(str), + Optional('host-key-checking', default=True): bool, }) # Images