Merge "Integrate nodescan worker into launcher"
This commit is contained in:
commit
03d5ba8181
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import fixtures
|
||||
import testtools
|
||||
from kazoo.exceptions import NoNodeError
|
||||
|
||||
@ -29,6 +30,11 @@ class BaseCloudDriverTest(ZuulTestCase):
|
||||
cloud_test_image_format = ''
|
||||
cloud_test_provider_name = ''
|
||||
|
||||
def setUp(self):
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'zuul.launcher.server.NodescanRequest.FAKE', True))
|
||||
super().setUp()
|
||||
|
||||
def _getEndpoint(self):
|
||||
# Use the launcher provider so that we're using the same ttl
|
||||
# method caches.
|
||||
|
@ -21,6 +21,7 @@ from unittest import mock
|
||||
from zuul import model
|
||||
from zuul.launcher.client import LauncherClient
|
||||
|
||||
import fixtures
|
||||
import responses
|
||||
import testtools
|
||||
from kazoo.exceptions import NoNodeError
|
||||
@ -127,6 +128,8 @@ class LauncherBaseTestCase(ZuulTestCase):
|
||||
self.mock_aws.start()
|
||||
# Must start responses after mock_aws
|
||||
self.useFixture(ImageMocksFixture())
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'zuul.launcher.server.NodescanRequest.FAKE', True))
|
||||
self.s3 = boto3.resource('s3', region_name='us-west-2')
|
||||
self.s3.create_bucket(
|
||||
Bucket='zuul',
|
||||
@ -424,10 +427,20 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
|
||||
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
|
||||
def test_jobs_executed(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
nodes = self.launcher.api.nodes_cache.getItems()
|
||||
self.assertNotEqual(nodes[0].host_keys, [])
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(self.getJobFromHistory('check-job').result,
|
||||
'SUCCESS')
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
@ -834,7 +847,9 @@ class TestMinReadyLauncher(LauncherBaseTestCase):
|
||||
if len(in_use_nodes) == 2:
|
||||
break
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.assertNotEqual(nodes[0].host_keys, [])
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
|
@ -128,9 +128,11 @@ class TestNodescanWorker(BaseTestCase):
|
||||
interface_ip='198.51.100.1',
|
||||
connection_port=22,
|
||||
connection_type='ssh',
|
||||
host_key_checking=True,
|
||||
boot_timeout=300,
|
||||
)
|
||||
worker.start()
|
||||
request = NodescanRequest(node, True, 300, self.log)
|
||||
request = NodescanRequest(node, self.log)
|
||||
worker.addRequest(request)
|
||||
for _ in iterate_timeout(30, 'waiting for nodescan'):
|
||||
if request.complete:
|
||||
@ -156,9 +158,11 @@ class TestNodescanWorker(BaseTestCase):
|
||||
interface_ip='198.51.100.1',
|
||||
connection_port=22,
|
||||
connection_type='ssh',
|
||||
host_key_checking=True,
|
||||
boot_timeout=1,
|
||||
)
|
||||
worker.start()
|
||||
request = NodescanRequest(node, True, 1, self.log)
|
||||
request = NodescanRequest(node, self.log)
|
||||
worker.addRequest(request)
|
||||
for _ in iterate_timeout(30, 'waiting for nodescan'):
|
||||
if request.complete:
|
||||
@ -185,9 +189,11 @@ class TestNodescanWorker(BaseTestCase):
|
||||
interface_ip='198.51.100.1',
|
||||
connection_port=22,
|
||||
connection_type='ssh',
|
||||
host_key_checking=True,
|
||||
boot_timeout=1,
|
||||
)
|
||||
worker.start()
|
||||
request = NodescanRequest(node, True, 1, self.log)
|
||||
request = NodescanRequest(node, self.log)
|
||||
worker.addRequest(request)
|
||||
for _ in iterate_timeout(30, 'waiting for nodescan'):
|
||||
if request.complete:
|
||||
@ -215,9 +221,11 @@ class TestNodescanWorker(BaseTestCase):
|
||||
interface_ip='198.51.100.1',
|
||||
connection_port=22,
|
||||
connection_type='ssh',
|
||||
host_key_checking=True,
|
||||
boot_timeout=1,
|
||||
)
|
||||
worker.start()
|
||||
request = NodescanRequest(node, True, 1, self.log)
|
||||
request = NodescanRequest(node, self.log)
|
||||
worker.addRequest(request)
|
||||
for _ in iterate_timeout(30, 'waiting for nodescan'):
|
||||
if request.complete:
|
||||
@ -252,16 +260,20 @@ class TestNodescanWorker(BaseTestCase):
|
||||
interface_ip='198.51.100.1',
|
||||
connection_port=22,
|
||||
connection_type='ssh',
|
||||
host_key_checking=True,
|
||||
boot_timeout=300,
|
||||
)
|
||||
node2 = DummyProviderNode()
|
||||
node2._set(
|
||||
interface_ip='198.51.100.2',
|
||||
connection_port=22,
|
||||
connection_type='ssh',
|
||||
host_key_checking=True,
|
||||
boot_timeout=300,
|
||||
)
|
||||
|
||||
request1 = NodescanRequest(node1, True, 300, self.log)
|
||||
request2 = NodescanRequest(node2, True, 300, self.log)
|
||||
request1 = NodescanRequest(node1, self.log)
|
||||
request2 = NodescanRequest(node2, self.log)
|
||||
worker.addRequest(request1)
|
||||
worker.addRequest(request2)
|
||||
worker.start()
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Copyright 2024 BMW Group
|
||||
# Copyright 2024 Acme Gating, LLC
|
||||
# Copyright 2024-2025 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -267,11 +267,11 @@ class NodescanRequest:
|
||||
# For unit testing
|
||||
FAKE = False
|
||||
|
||||
def __init__(self, node, host_key_checking, timeout, log):
|
||||
def __init__(self, node, log):
|
||||
self.state = self.START
|
||||
self.node = node
|
||||
self.host_key_checking = host_key_checking
|
||||
self.timeout = timeout
|
||||
self.host_key_checking = node.host_key_checking
|
||||
self.timeout = node.boot_timeout
|
||||
self.log = log
|
||||
self.complete = False
|
||||
self.keys = []
|
||||
@ -605,6 +605,9 @@ class NodescanWorker:
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def length(self):
|
||||
return len(self._active_requests) + len(self._pending_requests)
|
||||
|
||||
def registerDescriptor(self, fd):
|
||||
"""Register the fd with the poll object"""
|
||||
# Oneshot means that once it triggers, it will automatically
|
||||
@ -734,6 +737,7 @@ class Launcher:
|
||||
self._imageUpdatedCallback
|
||||
)
|
||||
|
||||
self.nodescan_worker = NodescanWorker()
|
||||
self.launcher_thread = threading.Thread(
|
||||
target=self.run,
|
||||
name="Launcher",
|
||||
@ -940,6 +944,8 @@ class Launcher:
|
||||
label=label.name,
|
||||
label_config_hash=label.config_hash,
|
||||
max_ready_age=label.max_ready_age,
|
||||
host_key_checking=label.host_key_checking,
|
||||
boot_timeout=label.boot_timeout,
|
||||
request_id=request.uuid,
|
||||
zuul_event_id=request.zuul_event_id,
|
||||
connection_name=provider.connection_name,
|
||||
@ -1108,15 +1114,46 @@ class Launcher:
|
||||
if not node.create_state_machine.complete:
|
||||
self.wake_event.set()
|
||||
return
|
||||
self._updateNodeFromInstance(node, instance)
|
||||
node.setState(node.State.READY)
|
||||
self.wake_event.set()
|
||||
log.debug("Marking node %s as %s", node, node.state)
|
||||
# Note this method has the side effect of updating
|
||||
# node info from the instance.
|
||||
if self._checkNodescanRequest(node, instance):
|
||||
node.setState(node.State.READY)
|
||||
self.wake_event.set()
|
||||
log.debug("Marking node %s as %s", node, node.state)
|
||||
node.releaseLock(ctx)
|
||||
|
||||
def _checkNodescanRequest(self, node, instance):
|
||||
if node.nodescan_request is None:
|
||||
# We just finished the create state machine, update with
|
||||
# new info.
|
||||
self._updateNodeFromInstance(node, instance)
|
||||
node.nodescan_request = NodescanRequest(node, self.log)
|
||||
self.nodescan_worker.addRequest(node.nodescan_request)
|
||||
self.log.debug(
|
||||
"Submitted nodescan request for %s queue length %s",
|
||||
node.interface_ip,
|
||||
self.nodescan_worker.length())
|
||||
if not node.nodescan_request.complete:
|
||||
return False
|
||||
try:
|
||||
keys = node.nodescan_request.result()
|
||||
except Exception as e:
|
||||
if isinstance(e, exceptions.ConnectionTimeoutException):
|
||||
self.log.warning("Error scanning keys: %s", str(e))
|
||||
else:
|
||||
self.log.exception("Exception scanning keys:")
|
||||
raise exceptions.LaunchKeyscanException(
|
||||
"Can't scan key for %s" % (node,))
|
||||
if keys:
|
||||
node.host_keys = keys
|
||||
return True
|
||||
|
||||
def _cleanupNode(self, node, log):
|
||||
with self.createZKContext(node._lock, self.log) as ctx:
|
||||
with node.activeContext(ctx):
|
||||
self.nodescan_worker.removeRequest(node.nodescan_request)
|
||||
node.nodescan_request = None
|
||||
|
||||
if not node.delete_state_machine:
|
||||
log.debug("Cleaning up node %s", node)
|
||||
provider = self._getProviderForNode(
|
||||
@ -1162,6 +1199,7 @@ class Launcher:
|
||||
node_uuid = uuid.uuid4().hex
|
||||
# We don't pass a provider here as the node should not
|
||||
# be directly associated with a tenant or provider.
|
||||
image = provider.images[label.image]
|
||||
tags = provider.getNodeTags(
|
||||
self.system.system_id, label, node_uuid)
|
||||
node_class = provider.driver.getProviderNodeClass()
|
||||
@ -1172,12 +1210,17 @@ class Launcher:
|
||||
label=label.name,
|
||||
label_config_hash=label.config_hash,
|
||||
max_ready_age=label.max_ready_age,
|
||||
host_key_checking=label.host_key_checking,
|
||||
boot_timeout=label.boot_timeout,
|
||||
request_id=None,
|
||||
connection_name=provider.connection_name,
|
||||
zuul_event_id=uuid.uuid4().hex,
|
||||
tenant_name=None,
|
||||
provider=None,
|
||||
tags=tags,
|
||||
# Set any node attributes we already know here
|
||||
connection_port=image.connection_port,
|
||||
connection_type=image.connection_type,
|
||||
)
|
||||
self.log.debug("Created min-ready node %s via provider %s",
|
||||
node, provider)
|
||||
@ -1355,6 +1398,9 @@ class Launcher:
|
||||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
|
||||
self.log.debug("Starting nodescan worker")
|
||||
self.nodescan_worker.start()
|
||||
|
||||
self.log.debug("Starting launcher thread")
|
||||
self.launcher_thread.start()
|
||||
|
||||
@ -1368,6 +1414,7 @@ class Launcher:
|
||||
self.connections.stop()
|
||||
self.upload_executor.shutdown()
|
||||
self.endpoint_upload_executor.shutdown()
|
||||
self.nodescan_worker.stop()
|
||||
# Endpoints are stopped by drivers
|
||||
self.log.debug("Stopped launcher")
|
||||
|
||||
@ -1380,6 +1427,7 @@ class Launcher:
|
||||
self.api.stop()
|
||||
self.zk_client.disconnect()
|
||||
self.tracing.stop()
|
||||
self.nodescan_worker.join()
|
||||
self.log.debug("Joined launcher")
|
||||
|
||||
def runCommand(self):
|
||||
|
@ -2468,6 +2468,8 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin,
|
||||
connection_name="",
|
||||
create_state={},
|
||||
delete_state={},
|
||||
host_key_checking=None,
|
||||
boot_timeout=None,
|
||||
# Node data
|
||||
host_id=None,
|
||||
interface_ip=None,
|
||||
@ -2487,10 +2489,12 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin,
|
||||
resources=None,
|
||||
attributes={},
|
||||
tenant_name=None,
|
||||
host_keys=[],
|
||||
# Attributes that are not serialized
|
||||
is_locked=False,
|
||||
create_state_machine=None,
|
||||
delete_state_machine=None,
|
||||
nodescan_request=None,
|
||||
# Attributes set by the launcher
|
||||
_lscores=None,
|
||||
)
|
||||
@ -2552,24 +2556,27 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin,
|
||||
|
||||
def getNodeData(self):
|
||||
return dict(
|
||||
host_id=self.host_id,
|
||||
interface_ip=self.interface_ip,
|
||||
public_ipv4=self.public_ipv4,
|
||||
private_ipv4=self.private_ipv4,
|
||||
public_ipv6=self.public_ipv6,
|
||||
private_ipv6=self.private_ipv6,
|
||||
attributes=self.attributes,
|
||||
az=self.az,
|
||||
boot_timeout=self.boot_timeout,
|
||||
cloud=self.cloud,
|
||||
connection_port=self.connection_port,
|
||||
connection_type=self.connection_type,
|
||||
slot=self.slot,
|
||||
az=self.az,
|
||||
cloud=self.cloud,
|
||||
provider=self.provider,
|
||||
region=self.region,
|
||||
username=self.username,
|
||||
hold_expiration=self.hold_expiration,
|
||||
host_id=self.host_id,
|
||||
host_key_checking=self.host_key_checking,
|
||||
host_keys=self.host_keys,
|
||||
interface_ip=self.interface_ip,
|
||||
private_ipv4=self.private_ipv4,
|
||||
private_ipv6=self.private_ipv6,
|
||||
provider=self.provider,
|
||||
public_ipv4=self.public_ipv4,
|
||||
public_ipv6=self.public_ipv6,
|
||||
region=self.region,
|
||||
resources=self.resources,
|
||||
attributes=self.attributes,
|
||||
slot=self.slot,
|
||||
tenant_name=self.tenant_name,
|
||||
username=self.username,
|
||||
)
|
||||
|
||||
|
||||
|
@ -34,11 +34,13 @@ base_label = vs.Schema({
|
||||
Optional('tags', default=dict): {str: str},
|
||||
Optional('min_ready', default=0): int,
|
||||
Optional('max_ready_age', default=0): int,
|
||||
Optional('boot-timeout', default=300): int,
|
||||
})
|
||||
|
||||
# Label attributes that are common to any kind of ssh-based driver.
|
||||
ssh_label = vs.Schema({
|
||||
Optional('key-name'): Nullable(str),
|
||||
Optional('host-key-checking', default=True): bool,
|
||||
})
|
||||
|
||||
# Images
|
||||
|
Loading…
x
Reference in New Issue
Block a user