Integrate nodescan worker into launcher

This checks for connectivity and scans for ssh host keys on new nodes.

Unlike nodepool, we will store the flag indicating whether we should
scan for keys as well as the boot timeout on the node.  This is
because later in the node creation process, we no longer have easy
access to the label with that configuration information (and it may
change or no longer be present by the end).

Semi-related:

There are two locations in the launcher where we create new node
objects; one of them was missing some optional initializers related
to image information.  It is updated so that both are at parity.

Change-Id: I8ff7188cc8c8ae8f33ab1e92f3f5b0b91b9bfd41
This commit is contained in:
James E. Blair
2025-01-30 14:52:12 -08:00
parent 8b1d46d246
commit b2e5fd3d35
6 changed files with 118 additions and 28 deletions

View File

@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import testtools
from kazoo.exceptions import NoNodeError
@@ -29,6 +30,11 @@ class BaseCloudDriverTest(ZuulTestCase):
cloud_test_image_format = ''
cloud_test_provider_name = ''
def setUp(self):
self.useFixture(fixtures.MonkeyPatch(
'zuul.launcher.server.NodescanRequest.FAKE', True))
super().setUp()
def _getEndpoint(self):
# Use the launcher provider so that we're using the same ttl
# method caches.

View File

@@ -21,6 +21,7 @@ from unittest import mock
from zuul import model
from zuul.launcher.client import LauncherClient
import fixtures
import responses
import testtools
from kazoo.exceptions import NoNodeError
@@ -127,6 +128,8 @@ class LauncherBaseTestCase(ZuulTestCase):
self.mock_aws.start()
# Must start responses after mock_aws
self.useFixture(ImageMocksFixture())
self.useFixture(fixtures.MonkeyPatch(
'zuul.launcher.server.NodescanRequest.FAKE', True))
self.s3 = boto3.resource('s3', region_name='us-west-2')
self.s3.create_bucket(
Bucket='zuul',
@@ -424,10 +427,20 @@ class TestLauncher(LauncherBaseTestCase):
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_jobs_executed(self):
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
nodes = self.launcher.api.nodes_cache.getItems()
self.assertNotEqual(nodes[0].host_keys, [])
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('check-job').result,
'SUCCESS')
self.assertEqual(A.data['status'], 'MERGED')
@@ -834,7 +847,9 @@ class TestMinReadyLauncher(LauncherBaseTestCase):
if len(in_use_nodes) == 2:
break
self.executor_server.hold_jobs_in_build = True
self.assertNotEqual(nodes[0].host_keys, [])
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()

View File

@@ -128,9 +128,11 @@ class TestNodescanWorker(BaseTestCase):
interface_ip='198.51.100.1',
connection_port=22,
connection_type='ssh',
host_key_checking=True,
boot_timeout=300,
)
worker.start()
request = NodescanRequest(node, True, 300, self.log)
request = NodescanRequest(node, self.log)
worker.addRequest(request)
for _ in iterate_timeout(30, 'waiting for nodescan'):
if request.complete:
@@ -156,9 +158,11 @@ class TestNodescanWorker(BaseTestCase):
interface_ip='198.51.100.1',
connection_port=22,
connection_type='ssh',
host_key_checking=True,
boot_timeout=1,
)
worker.start()
request = NodescanRequest(node, True, 1, self.log)
request = NodescanRequest(node, self.log)
worker.addRequest(request)
for _ in iterate_timeout(30, 'waiting for nodescan'):
if request.complete:
@@ -185,9 +189,11 @@ class TestNodescanWorker(BaseTestCase):
interface_ip='198.51.100.1',
connection_port=22,
connection_type='ssh',
host_key_checking=True,
boot_timeout=1,
)
worker.start()
request = NodescanRequest(node, True, 1, self.log)
request = NodescanRequest(node, self.log)
worker.addRequest(request)
for _ in iterate_timeout(30, 'waiting for nodescan'):
if request.complete:
@@ -215,9 +221,11 @@ class TestNodescanWorker(BaseTestCase):
interface_ip='198.51.100.1',
connection_port=22,
connection_type='ssh',
host_key_checking=True,
boot_timeout=1,
)
worker.start()
request = NodescanRequest(node, True, 1, self.log)
request = NodescanRequest(node, self.log)
worker.addRequest(request)
for _ in iterate_timeout(30, 'waiting for nodescan'):
if request.complete:
@@ -252,16 +260,20 @@ class TestNodescanWorker(BaseTestCase):
interface_ip='198.51.100.1',
connection_port=22,
connection_type='ssh',
host_key_checking=True,
boot_timeout=300,
)
node2 = DummyProviderNode()
node2._set(
interface_ip='198.51.100.2',
connection_port=22,
connection_type='ssh',
host_key_checking=True,
boot_timeout=300,
)
request1 = NodescanRequest(node1, True, 300, self.log)
request2 = NodescanRequest(node2, True, 300, self.log)
request1 = NodescanRequest(node1, self.log)
request2 = NodescanRequest(node2, self.log)
worker.addRequest(request1)
worker.addRequest(request2)
worker.start()

View File

@@ -1,5 +1,5 @@
# Copyright 2024 BMW Group
# Copyright 2024 Acme Gating, LLC
# Copyright 2024-2025 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -267,11 +267,11 @@ class NodescanRequest:
# For unit testing
FAKE = False
def __init__(self, node, host_key_checking, timeout, log):
def __init__(self, node, log):
self.state = self.START
self.node = node
self.host_key_checking = host_key_checking
self.timeout = timeout
self.host_key_checking = node.host_key_checking
self.timeout = node.boot_timeout
self.log = log
self.complete = False
self.keys = []
@@ -605,6 +605,9 @@ class NodescanWorker:
except ValueError:
pass
def length(self):
return len(self._active_requests) + len(self._pending_requests)
def registerDescriptor(self, fd):
"""Register the fd with the poll object"""
# Oneshot means that once it triggers, it will automatically
@@ -734,6 +737,7 @@ class Launcher:
self._imageUpdatedCallback
)
self.nodescan_worker = NodescanWorker()
self.launcher_thread = threading.Thread(
target=self.run,
name="Launcher",
@@ -940,6 +944,8 @@ class Launcher:
label=label.name,
label_config_hash=label.config_hash,
max_ready_age=label.max_ready_age,
host_key_checking=label.host_key_checking,
boot_timeout=label.boot_timeout,
request_id=request.uuid,
zuul_event_id=request.zuul_event_id,
connection_name=provider.connection_name,
@@ -1108,15 +1114,46 @@ class Launcher:
if not node.create_state_machine.complete:
self.wake_event.set()
return
self._updateNodeFromInstance(node, instance)
node.setState(node.State.READY)
self.wake_event.set()
log.debug("Marking node %s as %s", node, node.state)
# Note this method has the side effect of updating
# node info from the instance.
if self._checkNodescanRequest(node, instance):
node.setState(node.State.READY)
self.wake_event.set()
log.debug("Marking node %s as %s", node, node.state)
node.releaseLock(ctx)
def _checkNodescanRequest(self, node, instance):
if node.nodescan_request is None:
# We just finished the create state machine, update with
# new info.
self._updateNodeFromInstance(node, instance)
node.nodescan_request = NodescanRequest(node, self.log)
self.nodescan_worker.addRequest(node.nodescan_request)
self.log.debug(
"Submitted nodescan request for %s queue length %s",
node.interface_ip,
self.nodescan_worker.length())
if not node.nodescan_request.complete:
return False
try:
keys = node.nodescan_request.result()
except Exception as e:
if isinstance(e, exceptions.ConnectionTimeoutException):
self.log.warning("Error scanning keys: %s", str(e))
else:
self.log.exception("Exception scanning keys:")
raise exceptions.LaunchKeyscanException(
"Can't scan key for %s" % (node,))
if keys:
node.host_keys = keys
return True
def _cleanupNode(self, node, log):
with self.createZKContext(node._lock, self.log) as ctx:
with node.activeContext(ctx):
self.nodescan_worker.removeRequest(node.nodescan_request)
node.nodescan_request = None
if not node.delete_state_machine:
log.debug("Cleaning up node %s", node)
provider = self._getProviderForNode(
@@ -1162,6 +1199,7 @@ class Launcher:
node_uuid = uuid.uuid4().hex
# We don't pass a provider here as the node should not
# be directly associated with a tenant or provider.
image = provider.images[label.image]
tags = provider.getNodeTags(
self.system.system_id, label, node_uuid)
node_class = provider.driver.getProviderNodeClass()
@@ -1172,12 +1210,17 @@ class Launcher:
label=label.name,
label_config_hash=label.config_hash,
max_ready_age=label.max_ready_age,
host_key_checking=label.host_key_checking,
boot_timeout=label.boot_timeout,
request_id=None,
connection_name=provider.connection_name,
zuul_event_id=uuid.uuid4().hex,
tenant_name=None,
provider=None,
tags=tags,
# Set any node attributes we already know here
connection_port=image.connection_port,
connection_type=image.connection_type,
)
self.log.debug("Created min-ready node %s via provider %s",
node, provider)
@@ -1355,6 +1398,9 @@ class Launcher:
self.command_thread.daemon = True
self.command_thread.start()
self.log.debug("Starting nodescan worker")
self.nodescan_worker.start()
self.log.debug("Starting launcher thread")
self.launcher_thread.start()
@@ -1368,6 +1414,7 @@ class Launcher:
self.connections.stop()
self.upload_executor.shutdown()
self.endpoint_upload_executor.shutdown()
self.nodescan_worker.stop()
# Endpoints are stopped by drivers
self.log.debug("Stopped launcher")
@@ -1380,6 +1427,7 @@ class Launcher:
self.api.stop()
self.zk_client.disconnect()
self.tracing.stop()
self.nodescan_worker.join()
self.log.debug("Joined launcher")
def runCommand(self):

View File

@@ -2468,6 +2468,8 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin,
connection_name="",
create_state={},
delete_state={},
host_key_checking=None,
boot_timeout=None,
# Node data
host_id=None,
interface_ip=None,
@@ -2487,10 +2489,12 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin,
resources=None,
attributes={},
tenant_name=None,
host_keys=[],
# Attributes that are not serialized
is_locked=False,
create_state_machine=None,
delete_state_machine=None,
nodescan_request=None,
# Attributes set by the launcher
_lscores=None,
)
@@ -2552,24 +2556,27 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin,
def getNodeData(self):
return dict(
host_id=self.host_id,
interface_ip=self.interface_ip,
public_ipv4=self.public_ipv4,
private_ipv4=self.private_ipv4,
public_ipv6=self.public_ipv6,
private_ipv6=self.private_ipv6,
attributes=self.attributes,
az=self.az,
boot_timeout=self.boot_timeout,
cloud=self.cloud,
connection_port=self.connection_port,
connection_type=self.connection_type,
slot=self.slot,
az=self.az,
cloud=self.cloud,
provider=self.provider,
region=self.region,
username=self.username,
hold_expiration=self.hold_expiration,
host_id=self.host_id,
host_key_checking=self.host_key_checking,
host_keys=self.host_keys,
interface_ip=self.interface_ip,
private_ipv4=self.private_ipv4,
private_ipv6=self.private_ipv6,
provider=self.provider,
public_ipv4=self.public_ipv4,
public_ipv6=self.public_ipv6,
region=self.region,
resources=self.resources,
attributes=self.attributes,
slot=self.slot,
tenant_name=self.tenant_name,
username=self.username,
)

View File

@@ -34,11 +34,13 @@ base_label = vs.Schema({
Optional('tags', default=dict): {str: str},
Optional('min_ready', default=0): int,
Optional('max_ready_age', default=0): int,
Optional('boot-timeout', default=300): int,
})
# Label attributes that are common to any kind of ssh-based driver.
ssh_label = vs.Schema({
Optional('key-name'): Nullable(str),
Optional('host-key-checking', default=True): bool,
})
# Images