diff --git a/tests/unit/test_cloud_driver.py b/tests/unit/test_cloud_driver.py index d2a869b17a..453c971690 100644 --- a/tests/unit/test_cloud_driver.py +++ b/tests/unit/test_cloud_driver.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import fixtures import testtools from kazoo.exceptions import NoNodeError @@ -29,6 +30,11 @@ class BaseCloudDriverTest(ZuulTestCase): cloud_test_image_format = '' cloud_test_provider_name = '' + def setUp(self): + self.useFixture(fixtures.MonkeyPatch( + 'zuul.launcher.server.NodescanRequest.FAKE', True)) + super().setUp() + def _getEndpoint(self): # Use the launcher provider so that we're using the same ttl # method caches. diff --git a/tests/unit/test_launcher.py b/tests/unit/test_launcher.py index 960baa3da9..349db353a4 100644 --- a/tests/unit/test_launcher.py +++ b/tests/unit/test_launcher.py @@ -21,6 +21,7 @@ from unittest import mock from zuul import model from zuul.launcher.client import LauncherClient +import fixtures import responses import testtools from kazoo.exceptions import NoNodeError @@ -127,6 +128,8 @@ class LauncherBaseTestCase(ZuulTestCase): self.mock_aws.start() # Must start responses after mock_aws self.useFixture(ImageMocksFixture()) + self.useFixture(fixtures.MonkeyPatch( + 'zuul.launcher.server.NodescanRequest.FAKE', True)) self.s3 = boto3.resource('s3', region_name='us-west-2') self.s3.create_bucket( Bucket='zuul', @@ -424,10 +427,20 @@ class TestLauncher(LauncherBaseTestCase): @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) def test_jobs_executed(self): + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addApproval('Code-Review', 2) self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) self.waitUntilSettled() + + nodes = self.launcher.api.nodes_cache.getItems() + self.assertNotEqual(nodes[0].host_keys, []) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + self.assertEqual(self.getJobFromHistory('check-job').result, 'SUCCESS') self.assertEqual(A.data['status'], 'MERGED') @@ -834,7 +847,9 @@ class TestMinReadyLauncher(LauncherBaseTestCase): if len(in_use_nodes) == 2: break - self.executor_server.hold_jobs_in_build = True + self.assertNotEqual(nodes[0].host_keys, []) + + self.executor_server.hold_jobs_in_build = False self.executor_server.release() self.waitUntilSettled() diff --git a/tests/unit/test_nodescan.py b/tests/unit/test_nodescan.py index 89afa6ed3e..3d1e948805 100644 --- a/tests/unit/test_nodescan.py +++ b/tests/unit/test_nodescan.py @@ -128,9 +128,11 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=300, ) worker.start() - request = NodescanRequest(node, True, 300, self.log) + request = NodescanRequest(node, self.log) worker.addRequest(request) for _ in iterate_timeout(30, 'waiting for nodescan'): if request.complete: @@ -156,9 +158,11 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=1, ) worker.start() - request = NodescanRequest(node, True, 1, self.log) + request = NodescanRequest(node, self.log) worker.addRequest(request) for _ in iterate_timeout(30, 'waiting for nodescan'): if request.complete: @@ -185,9 +189,11 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=1, ) worker.start() - request = NodescanRequest(node, True, 1, self.log) + request = NodescanRequest(node, self.log) worker.addRequest(request) for _ in iterate_timeout(30, 'waiting for nodescan'): if request.complete: @@ -215,9 +221,11 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=1, ) worker.start() - request = NodescanRequest(node, True, 1, self.log) + request = NodescanRequest(node, self.log) worker.addRequest(request) for _ in iterate_timeout(30, 'waiting for nodescan'): if request.complete: @@ -252,16 +260,20 @@ class TestNodescanWorker(BaseTestCase): interface_ip='198.51.100.1', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=300, ) node2 = DummyProviderNode() node2._set( interface_ip='198.51.100.2', connection_port=22, connection_type='ssh', + host_key_checking=True, + boot_timeout=300, ) - request1 = NodescanRequest(node1, True, 300, self.log) - request2 = NodescanRequest(node2, True, 300, self.log) + request1 = NodescanRequest(node1, self.log) + request2 = NodescanRequest(node2, self.log) worker.addRequest(request1) worker.addRequest(request2) worker.start() diff --git a/zuul/launcher/server.py b/zuul/launcher/server.py index d54f122910..afeefbba4c 100644 --- a/zuul/launcher/server.py +++ b/zuul/launcher/server.py @@ -1,5 +1,5 @@ # Copyright 2024 BMW Group -# Copyright 2024 Acme Gating, LLC +# Copyright 2024-2025 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -267,11 +267,11 @@ class NodescanRequest: # For unit testing FAKE = False - def __init__(self, node, host_key_checking, timeout, log): + def __init__(self, node, log): self.state = self.START self.node = node - self.host_key_checking = host_key_checking - self.timeout = timeout + self.host_key_checking = node.host_key_checking + self.timeout = node.boot_timeout self.log = log self.complete = False self.keys = [] @@ -605,6 +605,9 @@ class NodescanWorker: except ValueError: pass + def length(self): + return len(self._active_requests) + len(self._pending_requests) + def registerDescriptor(self, fd): """Register the fd with the poll object""" # Oneshot means that once it triggers, it will automatically @@ -734,6 +737,7 @@ class Launcher: self._imageUpdatedCallback ) + self.nodescan_worker = NodescanWorker() self.launcher_thread = threading.Thread( target=self.run, name="Launcher", @@ -940,6 +944,8 @@ class Launcher: label=label.name, label_config_hash=label.config_hash, max_ready_age=label.max_ready_age, + host_key_checking=label.host_key_checking, + boot_timeout=label.boot_timeout, request_id=request.uuid, zuul_event_id=request.zuul_event_id, connection_name=provider.connection_name, @@ -1108,15 +1114,46 @@ class Launcher: if not node.create_state_machine.complete: self.wake_event.set() return - self._updateNodeFromInstance(node, instance) - node.setState(node.State.READY) - self.wake_event.set() - log.debug("Marking node %s as %s", node, node.state) + # Note this method has the side effect of updating + # node info from the instance. + if self._checkNodescanRequest(node, instance): + node.setState(node.State.READY) + self.wake_event.set() + log.debug("Marking node %s as %s", node, node.state) node.releaseLock(ctx) + def _checkNodescanRequest(self, node, instance): + if node.nodescan_request is None: + # We just finished the create state machine, update with + # new info. + self._updateNodeFromInstance(node, instance) + node.nodescan_request = NodescanRequest(node, self.log) + self.nodescan_worker.addRequest(node.nodescan_request) + self.log.debug( + "Submitted nodescan request for %s queue length %s", + node.interface_ip, + self.nodescan_worker.length()) + if not node.nodescan_request.complete: + return False + try: + keys = node.nodescan_request.result() + except Exception as e: + if isinstance(e, exceptions.ConnectionTimeoutException): + self.log.warning("Error scanning keys: %s", str(e)) + else: + self.log.exception("Exception scanning keys:") + raise exceptions.LaunchKeyscanException( + "Can't scan key for %s" % (node,)) + if keys: + node.host_keys = keys + return True + def _cleanupNode(self, node, log): with self.createZKContext(node._lock, self.log) as ctx: with node.activeContext(ctx): + self.nodescan_worker.removeRequest(node.nodescan_request) + node.nodescan_request = None + if not node.delete_state_machine: log.debug("Cleaning up node %s", node) provider = self._getProviderForNode( @@ -1162,6 +1199,7 @@ class Launcher: node_uuid = uuid.uuid4().hex # We don't pass a provider here as the node should not # be directly associated with a tenant or provider. + image = provider.images[label.image] tags = provider.getNodeTags( self.system.system_id, label, node_uuid) node_class = provider.driver.getProviderNodeClass() @@ -1172,12 +1210,17 @@ class Launcher: label=label.name, label_config_hash=label.config_hash, max_ready_age=label.max_ready_age, + host_key_checking=label.host_key_checking, + boot_timeout=label.boot_timeout, request_id=None, connection_name=provider.connection_name, zuul_event_id=uuid.uuid4().hex, tenant_name=None, provider=None, tags=tags, + # Set any node attributes we already know here + connection_port=image.connection_port, + connection_type=image.connection_type, ) self.log.debug("Created min-ready node %s via provider %s", node, provider) @@ -1355,6 +1398,9 @@ class Launcher: self.command_thread.daemon = True self.command_thread.start() + self.log.debug("Starting nodescan worker") + self.nodescan_worker.start() + self.log.debug("Starting launcher thread") self.launcher_thread.start() @@ -1368,6 +1414,7 @@ class Launcher: self.connections.stop() self.upload_executor.shutdown() self.endpoint_upload_executor.shutdown() + self.nodescan_worker.stop() # Endpoints are stopped by drivers self.log.debug("Stopped launcher") @@ -1380,6 +1427,7 @@ class Launcher: self.api.stop() self.zk_client.disconnect() self.tracing.stop() + self.nodescan_worker.join() self.log.debug("Joined launcher") def runCommand(self): diff --git a/zuul/model.py b/zuul/model.py index 09bd1763e6..41e31a7b09 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -2468,6 +2468,8 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin, connection_name="", create_state={}, delete_state={}, + host_key_checking=None, + boot_timeout=None, # Node data host_id=None, interface_ip=None, @@ -2487,10 +2489,12 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin, resources=None, attributes={}, tenant_name=None, + host_keys=[], # Attributes that are not serialized is_locked=False, create_state_machine=None, delete_state_machine=None, + nodescan_request=None, # Attributes set by the launcher _lscores=None, ) @@ -2552,24 +2556,27 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin, def getNodeData(self): return dict( - host_id=self.host_id, - interface_ip=self.interface_ip, - public_ipv4=self.public_ipv4, - private_ipv4=self.private_ipv4, - public_ipv6=self.public_ipv6, - private_ipv6=self.private_ipv6, + attributes=self.attributes, + az=self.az, + boot_timeout=self.boot_timeout, + cloud=self.cloud, connection_port=self.connection_port, connection_type=self.connection_type, - slot=self.slot, - az=self.az, - cloud=self.cloud, - provider=self.provider, - region=self.region, - username=self.username, hold_expiration=self.hold_expiration, + host_id=self.host_id, + host_key_checking=self.host_key_checking, + host_keys=self.host_keys, + interface_ip=self.interface_ip, + private_ipv4=self.private_ipv4, + private_ipv6=self.private_ipv6, + provider=self.provider, + public_ipv4=self.public_ipv4, + public_ipv6=self.public_ipv6, + region=self.region, resources=self.resources, - attributes=self.attributes, + slot=self.slot, tenant_name=self.tenant_name, + username=self.username, ) diff --git a/zuul/provider/schema.py b/zuul/provider/schema.py index 139c858dd2..43bb280fad 100644 --- a/zuul/provider/schema.py +++ b/zuul/provider/schema.py @@ -34,11 +34,13 @@ base_label = vs.Schema({ Optional('tags', default=dict): {str: str}, Optional('min_ready', default=0): int, Optional('max_ready_age', default=0): int, + Optional('boot-timeout', default=300): int, }) # Label attributes that are common to any kind of ssh-based driver. ssh_label = vs.Schema({ Optional('key-name'): Nullable(str), + Optional('host-key-checking', default=True): bool, }) # Images