Add niz cleanup

This adds leaked resource cleanup, using rendezvous hashing to
resolve which launcher should clean each endpoint.  There is
no secondary locking since a collision on cleanup is not critical.

This adds more testing than nodepool had for leaked fip and port
cleanup in openstack.  Options to configure cleanup behavior
are added.

Note that the options are set in the providers, but they really
affect endpoints.  This is an unavoidable mismatch, so we
use the most generous values.  This means that any openstack
provider on an endpoint can enable fip/port cleanup for
that endpoint.

The AWS tests used different regions which caused issues with
resources being partitioned.  Standardize on us-east-1 since
that was what most used.

AWS cleanup requires a bucket name to clean up leaked image
upload objects.  This can be configured on a provider, and is
an unavoidable mismatch for endpoint cleanups.  To resolve this,
we look at the bucket names for all providers for an endpoint
when we perform resource cleanup.

The shutdown sequence for moto is adjusted so that it happens
at the end of the normal test shutdown sequence.

The system_id is added to the provider and endpoint objects so
that they can construct an appropriate metadata dictionary
at any point.

Change-Id: I373df424e7db3d5ac22cdcd2d29a243ed960201d
This commit is contained in:
James E. Blair 2025-04-09 14:15:14 -07:00
parent dc30800734
commit e7d746c42d
24 changed files with 643 additions and 168 deletions

View File

@ -2551,6 +2551,7 @@ class ZuulTestCase(BaseTestCase):
launcher = TestLauncher(
self.config,
launcher_connections)
launcher._start_cleanup = False
launcher.start()
return launcher

View File

@ -141,8 +141,8 @@ class FakeAws:
def __init__(self):
self.tasks = {}
self.ec2 = boto3.resource('ec2', region_name='us-west-2')
self.ec2_client = boto3.client('ec2', region_name='us-west-2')
self.ec2 = boto3.resource('ec2', region_name='us-east-1')
self.ec2_client = boto3.client('ec2', region_name='us-east-1')
self.fail_import_count = 0
def import_snapshot(self, *args, **kw):
@ -165,7 +165,7 @@ class FakeAws:
# Make a Volume to simulate the import finishing
volume = self.ec2_client.create_volume(
Size=80,
AvailabilityZone='us-west-2')
AvailabilityZone='us-east-1')
snap_id = self.ec2_client.create_snapshot(
VolumeId=volume['VolumeId'],
)["SnapshotId"]
@ -207,7 +207,7 @@ class FakeAws:
volume = self.ec2_client.create_volume(
Size=80,
AvailabilityZone='us-west-2')
AvailabilityZone='us-east-1')
snap_id = self.ec2_client.create_snapshot(
VolumeId=volume['VolumeId'],
Description=f'imported volume import-ami-{task_id}',

View File

@ -76,6 +76,10 @@ class FakeOpenstackFloatingIp(FakeOpenstackObject):
}
class FakeOpenstackPort(FakeOpenstackObject):
pass
class FakeOpenstackCloud:
log = logging.getLogger("zuul.FakeOpenstackCloud")
@ -103,6 +107,7 @@ class FakeOpenstackCloud:
name='fake-network',
)
]
self.ports = []
def _getConnection(self):
return FakeOpenstackConnection(self)
@ -156,12 +161,29 @@ class FakeOpenstackConnection:
def _has_floating_ips(self):
return False
def delete_unattached_floating_ips(self):
for x in self.cloud.floating_ips:
if not getattr(x, 'port', None):
self.cloud.floating_ips.remove(x)
def list_flavors(self, get_extra=False):
return self.cloud.flavors
def list_volumes(self):
return self.cloud.volumes
def list_ports(self, filters=None):
if filters and filters.get('status'):
return [p for p in self.cloud.ports
if p.status == filters['status']]
return self.cloud.ports
def delete_port(self, port_id):
for x in self.cloud.ports:
if x.id == port_id:
self.cloud.ports.remove(x)
return
def get_network(self, name_or_id, filters=None):
for x in self.cloud.networks:
if x.id == name_or_id or x.name == name_or_id:
@ -198,6 +220,7 @@ class FakeOpenstackConnection:
addresses=addresses,
interface_ip=interface_ip,
flavor=flavor,
metadata=meta,
)
server = FakeOpenstackServer(**args)
self.cloud.servers.append(server)

View File

@ -69,13 +69,13 @@
- name: debian-local
- section:
name: aws-us-west-2
name: aws-us-east-1
parent: aws-base
region: us-west-2
region: us-east-1
- provider:
name: aws-us-west-2-main
section: aws-us-west-2
name: aws-us-east-1-main
section: aws-us-east-1
labels:
- name: debian-local-normal
key-name: zuul

View File

@ -69,13 +69,13 @@
import-method: image
- section:
name: aws-us-west-2
name: aws-us-east-1
parent: aws-base
region: us-west-2
region: us-east-1
- provider:
name: aws-us-west-2-main
section: aws-us-west-2
name: aws-us-east-1-main
section: aws-us-east-1
labels:
- name: debian-local-normal
key-name: zuul

View File

@ -68,13 +68,13 @@
- name: debian-local
- section:
name: aws-us-west-2
name: aws-us-east-1
parent: aws-base
region: us-west-2
region: us-east-1
- provider:
name: aws-us-west-2-main
section: aws-us-west-2
name: aws-us-east-1-main
section: aws-us-east-1
labels:
- name: debian-local-normal
key-name: zuul

View File

@ -105,6 +105,8 @@
launch-timeout: 600
launch-attempts: 2
key-name: zuul
floating-ip-cleanup: true
port-cleanup-interval: 1
flavors:
- name: normal
flavor-name: Fake Flavor

View File

@ -17,10 +17,12 @@ import concurrent.futures
import contextlib
import time
from unittest import mock
import urllib.parse
import fixtures
from moto import mock_aws
import boto3
import botocore.exceptions
from zuul.driver.aws import AwsDriver
from zuul.driver.aws.awsmodel import AwsProviderNode
@ -145,9 +147,9 @@ class TestAwsDriver(BaseCloudDriverTest):
super().setUp()
def tearDown(self):
def shutdown(self):
super().shutdown()
self.mock_aws.stop()
super().tearDown()
def _assertProviderNodeAttributes(self, pnode):
super()._assertProviderNodeAttributes(pnode)
@ -289,6 +291,246 @@ class TestAwsDriver(BaseCloudDriverTest):
def test_aws_diskimage_ebs_direct(self):
self._test_diskimage()
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_aws_resource_cleanup(self):
self.waitUntilSettled()
self.launcher.cleanup_worker.INTERVAL = 1
# This tests everything except the image imports
# Start by setting up leaked resources
system_id = self.launcher.system.system_id
instance_tags = [
{'Key': 'zuul_system_id', 'Value': system_id},
{'Key': 'zuul_node_uuid', 'Value': '0000000042'},
]
s3_tags = {
'zuul_system_id': system_id,
'zuul_upload_uuid': '0000000042',
}
reservation = self.ec2_client.run_instances(
ImageId="ami-12c6146b", MinCount=1, MaxCount=1,
BlockDeviceMappings=[{
'DeviceName': '/dev/sda1',
'Ebs': {
'VolumeSize': 80,
'DeleteOnTermination': False
}
}],
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': instance_tags
}, {
'ResourceType': 'volume',
'Tags': instance_tags
}]
)
instance_id = reservation['Instances'][0]['InstanceId']
bucket = self.s3.Bucket('zuul')
bucket.put_object(Body=b'hi',
Key='testimage',
Tagging=urllib.parse.urlencode(s3_tags))
obj = self.s3.Object('zuul', 'testimage')
# This effectively asserts the object exists
self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
instance = self.ec2.Instance(instance_id)
self.assertEqual(instance.state['Name'], 'running')
volume_id = list(instance.volumes.all())[0].id
volume = self.ec2.Volume(volume_id)
self.assertEqual(volume.state, 'in-use')
self.log.debug("Start cleanup worker")
self.launcher.cleanup_worker.start()
for _ in iterate_timeout(30, 'instance deletion'):
instance = self.ec2.Instance(instance_id)
if instance.state['Name'] == 'terminated':
break
time.sleep(1)
for _ in iterate_timeout(30, 'volume deletion'):
volume = self.ec2.Volume(volume_id)
try:
if volume.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
time.sleep(1)
for _ in iterate_timeout(30, 'object deletion'):
obj = self.s3.Object('zuul', 'testimage')
try:
self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
except self.s3_client.exceptions.NoSuchKey:
break
time.sleep(1)
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_aws_resource_cleanup_import_snapshot(self):
# This tests the import_snapshot path
self.waitUntilSettled()
self.launcher.cleanup_worker.INTERVAL = 1
system_id = self.launcher.system.system_id
# Start by setting up leaked resources
image_tags = [
{'Key': 'zuul_system_id', 'Value': system_id},
{'Key': 'zuul_upload_uuid', 'Value': '0000000042'},
]
task = self.fake_aws.import_snapshot(
DiskContainer={
'Format': 'ova',
'UserBucket': {
'S3Bucket': 'zuul',
'S3Key': 'testfile',
}
},
TagSpecifications=[{
'ResourceType': 'import-snapshot-task',
'Tags': image_tags,
}])
snapshot_id = self.fake_aws.finish_import_snapshot(task)
register_response = self.ec2_client.register_image(
Architecture='amd64',
BlockDeviceMappings=[
{
'DeviceName': '/dev/sda1',
'Ebs': {
'DeleteOnTermination': True,
'SnapshotId': snapshot_id,
'VolumeSize': 20,
'VolumeType': 'gp2',
},
},
],
RootDeviceName='/dev/sda1',
VirtualizationType='hvm',
Name='testimage',
)
image_id = register_response['ImageId']
ami = self.ec2.Image(image_id)
new_snapshot_id = ami.block_device_mappings[0]['Ebs']['SnapshotId']
self.fake_aws.change_snapshot_id(task, new_snapshot_id)
# Note that the resulting image and snapshot do not have tags
# applied, so we test the automatic retagging methods in the
# adapter.
image = self.ec2.Image(image_id)
self.assertEqual(image.state, 'available')
snap = self.ec2.Snapshot(snapshot_id)
self.assertEqual(snap.state, 'completed')
# Now that the leaked resources exist, start the worker and
# wait for it to clean them.
self.log.debug("Start cleanup worker")
self.launcher.cleanup_worker.start()
for _ in iterate_timeout(30, 'ami deletion'):
image = self.ec2.Image(image_id)
try:
# If this has a value the image was not deleted
if image.state == 'available':
# Definitely not deleted yet
pass
except AttributeError:
# Per AWS API, a recently deleted image is empty and
# looking at the state raises an AttributeFailure; see
# https://github.com/boto/boto3/issues/2531. The image
# was deleted, so we continue on here
break
time.sleep(1)
for _ in iterate_timeout(30, 'snapshot deletion'):
snap = self.ec2.Snapshot(new_snapshot_id)
try:
if snap.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
time.sleep(1)
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_aws_resource_cleanup_import_image(self):
# This tests the import_image path
self.waitUntilSettled()
self.launcher.cleanup_worker.INTERVAL = 1
system_id = self.launcher.system.system_id
# Start by setting up leaked resources
image_tags = [
{'Key': 'zuul_system_id', 'Value': system_id},
{'Key': 'zuul_upload_uuid', 'Value': '0000000042'},
]
# The image import path:
task = self.fake_aws.import_image(
DiskContainers=[{
'Format': 'ova',
'UserBucket': {
'S3Bucket': 'zuul',
'S3Key': 'testfile',
}
}],
TagSpecifications=[{
'ResourceType': 'import-image-task',
'Tags': image_tags,
}])
image_id, snapshot_id = self.fake_aws.finish_import_image(task)
# Note that the resulting image and snapshot do not have tags
# applied, so we test the automatic retagging methods in the
# adapter.
image = self.ec2.Image(image_id)
self.assertEqual(image.state, 'available')
snap = self.ec2.Snapshot(snapshot_id)
self.assertEqual(snap.state, 'completed')
# Now that the leaked resources exist, start the provider and
# wait for it to clean them.
# Now that the leaked resources exist, start the worker and
# wait for it to clean them.
self.log.debug("Start cleanup worker")
self.launcher.cleanup_worker.start()
for _ in iterate_timeout(30, 'ami deletion'):
image = self.ec2.Image(image_id)
try:
# If this has a value the image was not deleted
if image.state == 'available':
# Definitely not deleted yet
pass
except AttributeError:
# Per AWS API, a recently deleted image is empty and
# looking at the state raises an AttributeFailure; see
# https://github.com/boto/boto3/issues/2531. The image
# was deleted, so we continue on here
break
time.sleep(1)
for _ in iterate_timeout(30, 'snapshot deletion'):
snap = self.ec2.Snapshot(snapshot_id)
try:
if snap.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
time.sleep(1)
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_state_machines_instance(self):
self._test_state_machines("debian-normal")

View File

@ -168,10 +168,8 @@ class LauncherBaseTestCase(ZuulTestCase):
self.mock_aws.start()
# Must start responses after mock_aws
self.useFixture(ImageMocksFixture())
self.s3 = boto3.resource('s3', region_name='us-west-2')
self.s3.create_bucket(
Bucket='zuul',
CreateBucketConfiguration={'LocationConstraint': 'us-west-2'})
self.s3 = boto3.resource('s3', region_name='us-east-1')
self.s3.create_bucket(Bucket='zuul')
self.addCleanup(self.mock_aws.stop)
self.patch(zuul.driver.aws.awsendpoint, 'CACHE_TTL', 1)

View File

@ -13,6 +13,7 @@
# under the License.
import os
import time
import fixtures
@ -21,14 +22,17 @@ import zuul.driver.openstack.openstackendpoint
from tests.fake_openstack import (
FakeOpenstackCloud,
FakeOpenstackFloatingIp,
FakeOpenstackPort,
FakeOpenstackProviderEndpoint,
)
from tests.base import (
FIXTURE_DIR,
ZuulTestCase,
simple_layout,
return_data,
driver_config,
iterate_timeout,
return_data,
simple_layout,
)
from tests.unit.test_launcher import ImageMocksFixture
from tests.unit.test_cloud_driver import BaseCloudDriverTest
@ -88,9 +92,6 @@ class BaseOpenstackDriverTest(ZuulTestCase):
'CACHE_TTL', 1)
super().setUp()
def tearDown(self):
super().tearDown()
class TestOpenstackDriver(BaseOpenstackDriverTest, BaseCloudDriverTest):
def _assertProviderNodeAttributes(self, pnode):
@ -133,8 +134,57 @@ class TestOpenstackDriver(BaseOpenstackDriverTest, BaseCloudDriverTest):
def test_openstack_diskimage(self):
self._test_diskimage()
# Openstack-driver specific tests
@simple_layout('layouts/openstack/nodepool.yaml', enable_nodepool=True)
def test_openstack_resource_cleanup(self):
self.waitUntilSettled()
self.launcher.cleanup_worker.INTERVAL = 1
conn = self.fake_cloud._getConnection()
system_id = self.launcher.system.system_id
tags = {
'zuul_system_id': system_id,
'zuul_node_uuid': '0000000042',
}
conn.create_server(
name="test",
meta=tags,
)
self.assertEqual(1, len(conn.list_servers()))
fip = FakeOpenstackFloatingIp(
id='42',
floating_ip_address='fake',
status='ACTIVE',
)
conn.cloud.floating_ips.append(fip)
self.assertEqual(1, len(conn.list_floating_ips()))
port = FakeOpenstackPort(
id='43',
status='DOWN',
device_owner='compute:foo',
)
conn.cloud.ports.append(port)
self.assertEqual(1, len(conn.list_ports()))
self.log.debug("Start cleanup worker")
self.launcher.cleanup_worker.start()
for _ in iterate_timeout(30, 'instance deletion'):
if not conn.list_servers():
break
time.sleep(1)
for _ in iterate_timeout(30, 'fip deletion'):
if not conn.list_floating_ips():
break
time.sleep(1)
for _ in iterate_timeout(30, 'port deletion'):
if not conn.list_ports():
break
time.sleep(1)
# Openstack-driver specific tests
class TestOpenstackDriverFloatingIp(BaseOpenstackDriverTest,
BaseCloudDriverTest):
# This test is for nova-net clouds with floating ips that require

View File

@ -2914,7 +2914,8 @@ class TenantParser(object):
provider = connection.driver.getProvider(
connection, tenant.name,
provider_config.canonical_name,
flat_config)
flat_config,
parse_context.scheduler.system.system_id)
shadow_layout.addProvider(provider)
for e in parsed_config.queue_errors:

View File

@ -28,9 +28,10 @@ class AwsDriver(Driver, EndpointCacheMixin,
return awsconnection.AwsConnection(self, name, config)
def getProvider(self, connection, tenant_name, canonical_name,
provider_config):
provider_config, system_id):
return awsprovider.AwsProvider(
self, connection, tenant_name, canonical_name, provider_config)
self, connection, tenant_name, canonical_name, provider_config,
system_id)
def getProviderClass(self):
return awsprovider.AwsProvider
@ -52,7 +53,8 @@ class AwsDriver(Driver, EndpointCacheMixin,
])
return self.getEndpointById(
endpoint_id,
create_args=(self, provider.connection, provider.region))
create_args=(self, provider.connection, provider.region,
provider.system_id))
def stop(self):
self.stopEndpoints()

View File

@ -363,9 +363,9 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
IMAGE_UPLOAD_SLEEP = 30
LAUNCH_TEMPLATE_PREFIX = 'zuul-launch-template'
def __init__(self, driver, connection, region):
def __init__(self, driver, connection, region, system_id):
name = f'{connection.connection_name}-{region}'
super().__init__(driver, connection, name)
super().__init__(driver, connection, name, system_id)
self.log = logging.getLogger(f"zuul.aws.{self.name}")
self.region = region
@ -409,6 +409,13 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
self.aws_quotas = self.aws.client("service-quotas")
self.ebs_client = self.aws.client('ebs')
self.provider_label_template_names = {}
# In listResources, we reconcile AMIs which appear to be
# imports but have no nodepool tags, however it's possible
# that these aren't nodepool images. If we determine that's
# the case, we'll add their ids here so we don't waste our
# time on that again.
self.not_our_images = set()
self.not_our_snapshots = set()
def startEndpoint(self):
self._running = True
@ -496,7 +503,14 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
def postConfig(self, provider):
self._createLaunchTemplates(provider)
def listResources(self, bucket_name):
def listResources(self, providers):
bucket_names = set()
for provider in providers:
if bn := provider.object_storage.get('bucket-name'):
bucket_names.add(bn)
self._tagSnapshots()
self._tagAmis()
for host in self._listHosts():
try:
if host['State'].lower() in [
@ -540,7 +554,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
continue
yield AwsResource(tag_list_to_dict(snap.get('Tags')),
AwsResource.TYPE_SNAPSHOT, snap['SnapshotId'])
if bucket_name:
for bucket_name in bucket_names:
for obj in self._listObjects(bucket_name):
with self.non_mutating_rate_limiter:
try:
@ -549,10 +563,10 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(tags['TagSet']),
AwsResource.TYPE_OBJECT, obj.key)
AwsResource.TYPE_OBJECT, obj.key,
bucket_name=bucket_name)
def deleteResource(self, resource, bucket_name):
self.deleteResource(resource, bucket_name)
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == AwsResource.TYPE_HOST:
self._releaseHost(resource.id, immediate=True)
@ -565,7 +579,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
if resource.type == AwsResource.TYPE_SNAPSHOT:
self._deleteSnapshot(resource.id)
if resource.type == AwsResource.TYPE_OBJECT:
self._deleteObject(bucket_name, resource.id)
self._deleteObject(resource.bucket_name, resource.id)
def listInstances(self):
volumes = {}
@ -901,14 +915,14 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
# Local implementation below
def _tagAmis(self, provider_name, not_our_images):
def _tagAmis(self):
# There is no way to tag imported AMIs, so this routine
# "eventually" tags them. We look for any AMIs without tags
# and we copy the tags from the associated snapshot or image
# import task.
to_examine = []
for ami in self._listAmis():
if ami['ImageId'] in not_our_images:
if ami['ImageId'] in self.not_our_images:
continue
if ami.get('Tags'):
continue
@ -921,7 +935,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
# This was an import image (not snapshot) so let's
# try to find tags from the import task.
tags = tag_list_to_dict(task.get('Tags'))
if (tags.get('zuul_provider_name') == provider_name):
if (tags.get('zuul_system_id') == self.system_id):
# Copy over tags
self.log.debug(
"Copying tags from import task %s to AMI",
@ -936,16 +950,16 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
# any tags from the snapshot import task, otherwise, mark
# it as an image we can ignore in future runs.
if len(ami.get('BlockDeviceMappings', [])) < 1:
not_our_images.add(ami['ImageId'])
self.not_our_images.add(ami['ImageId'])
continue
bdm = ami['BlockDeviceMappings'][0]
ebs = bdm.get('Ebs')
if not ebs:
not_our_images.add(ami['ImageId'])
self.not_our_images.add(ami['ImageId'])
continue
snapshot_id = ebs.get('SnapshotId')
if not snapshot_id:
not_our_images.add(ami['ImageId'])
self.not_our_images.add(ami['ImageId'])
continue
to_examine.append((ami, snapshot_id))
if not to_examine:
@ -963,12 +977,11 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
task_map[task_snapshot_id] = task['Tags']
for ami, snapshot_id in to_examine:
tags = task_map.get(snapshot_id)
tags = tag_list_to_dict(task_map.get(snapshot_id))
if not tags:
not_our_images.add(ami['ImageId'])
self.not_our_images.add(ami['ImageId'])
continue
metadata = tag_list_to_dict(tags)
if (metadata.get('zuul_provider_name') == provider_name):
if (tags.get('zuul_system_id') == self.system_id):
# Copy over tags
self.log.debug(
"Copying tags from import task to image %s",
@ -976,15 +989,15 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
with self.rate_limiter:
self.ec2_client.create_tags(
Resources=[ami['ImageId']],
Tags=task['Tags'])
Tags=task_map.get(snapshot_id))
else:
not_our_images.add(ami['ImageId'])
self.not_our_images.add(ami['ImageId'])
def _tagSnapshots(self, provider_name, not_our_snapshots):
def _tagSnapshots(self):
# See comments for _tagAmis
to_examine = []
for snap in self._listSnapshots():
if snap['SnapshotId'] in not_our_snapshots:
if snap['SnapshotId'] in self.not_our_snapshots:
continue
try:
if snap.get('Tags'):
@ -1004,7 +1017,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
# This was an import image (not snapshot) so let's
# try to find tags from the import task.
tags = tag_list_to_dict(task.get('Tags'))
if (tags.get('zuul_provider_name') == provider_name):
if (tags.get('zuul_system_id') == self.system_id):
# Copy over tags
self.log.debug(
f"Copying tags from import task {task_id}"
@ -1034,12 +1047,11 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
task_map[task_snapshot_id] = task['Tags']
for snap in to_examine:
tags = task_map.get(snap['SnapshotId'])
tags = tag_list_to_dict(task_map.get(snap['SnapshotId']))
if not tags:
not_our_snapshots.add(snap['SnapshotId'])
self.not_our_snapshots.add(snap['SnapshotId'])
continue
metadata = tag_list_to_dict(tags)
if (metadata.get('zuul_provider_name') == provider_name):
if (tags.get('zuul_system_id') == self.system_id):
# Copy over tags
self.log.debug(
"Copying tags from import task to snapshot %s",
@ -1047,9 +1059,9 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
with self.rate_limiter:
self.ec2_client.create_tags(
Resources=[snap['SnapshotId']],
Tags=tags)
Tags=task_map.get(snap['SnapshotId']))
else:
not_our_snapshots.add(snap['SnapshotId'])
self.not_our_snapshots.add(snap['SnapshotId'])
def _getImportImageTask(self, task_id):
paginator = self.ec2_client.get_paginator(
@ -1419,7 +1431,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
self.log.info("Creating launch templates")
tags = {
'zuul_managed': 'true',
'zuul_system_id': self.system_id,
'zuul_provider_name': provider.canonical_name,
}
existing_templates = dict() # for clean up and avoid creation attempt
@ -1512,14 +1524,13 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
for template_name, template in existing_templates.items():
if template_name not in configured_templates:
# check if the template was created by the current provider
tags = template.get('Tags', [])
for tag in tags:
if (tag['Key'] == 'zuul_provider_name' and
tag['Value'] == provider.canonical_name):
self.ec2_client.delete_launch_template(
LaunchTemplateName=template_name)
self.log.debug("Deleted unused launch template: %s",
template_name)
tags = tag_list_to_dict(template.get('Tags', []))
if (tags.get('zuul_system_id') == self.system_id and
tags.get('zuul_provider_name') == provider.canonical_name):
self.ec2_client.delete_launch_template(
LaunchTemplateName=template_name)
self.log.debug("Deleted unused launch template: %s",
template_name)
def _getLaunchTemplateName(self, args):
hasher = hashlib.sha256()
@ -1793,7 +1804,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Deleting instance {del_id}")
log.debug("Deleting instance %s", del_id)
count = len(ids)
with self.rate_limiter(log.debug, f"Deleted {count} instances"):
self.ec2_client.terminate_instances(InstanceIds=ids)
@ -1805,7 +1816,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Releasing host {del_id}")
log.debug("Releasing host %s", del_id)
count = len(ids)
with self.rate_limiter(log.debug, f"Released {count} hosts"):
self.ec2_client.release_hosts(HostIds=ids)
@ -1817,7 +1828,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
if host['HostId'] == external_id:
break
else:
log.warning(f"Host not found when releasing {external_id}")
log.warning("Host not found when releasing %s", external_id)
return None
if immediate:
with self.rate_limiter(log.debug, "Released host"):
@ -1835,7 +1846,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
if instance['InstanceId'] == external_id:
break
else:
log.warning(f"Instance not found when deleting {external_id}")
log.warning("Instance not found when deleting %s", external_id)
return None
if immediate:
with self.rate_limiter(log.debug, "Deleted instance"):
@ -1851,11 +1862,17 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
if volume['VolumeId'] == external_id:
break
else:
self.log.warning(f"Volume not found when deleting {external_id}")
self.log.warning("Volume not found when deleting %s", external_id)
return None
with self.rate_limiter(self.log.debug, "Deleted volume"):
self.log.debug(f"Deleting volume {external_id}")
self.ec2_client.delete_volume(VolumeId=volume['VolumeId'])
try:
self.ec2_client.delete_volume(VolumeId=volume['VolumeId'])
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'NotFound':
self.log.warning(
"Volume not found when deleting %s", external_id)
return None
return volume
def _deleteAmi(self, external_id):
@ -1863,11 +1880,17 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
if ami['ImageId'] == external_id:
break
else:
self.log.warning(f"AMI not found when deleting {external_id}")
self.log.warning("AMI not found when deleting %s", external_id)
return None
with self.rate_limiter:
self.log.debug(f"Deleting AMI {external_id}")
self.ec2_client.deregister_image(ImageId=ami['ImageId'])
try:
self.ec2_client.deregister_image(ImageId=ami['ImageId'])
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'NotFound':
self.log.warning(
"AMI not found when deleting %s", external_id)
return None
return ami
def _deleteSnapshot(self, external_id):
@ -1875,14 +1898,21 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
if snap['SnapshotId'] == external_id:
break
else:
self.log.warning(f"Snapshot not found when deleting {external_id}")
self.log.warning("Snapshot not found when deleting %s",
external_id)
return None
with self.rate_limiter:
self.log.debug(f"Deleting Snapshot {external_id}")
self.ec2_client.delete_snapshot(SnapshotId=snap['SnapshotId'])
try:
self.ec2_client.delete_snapshot(SnapshotId=snap['SnapshotId'])
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'NotFound':
self.log.warning(
"Snapshot not found when deleting %s", external_id)
return None
return snap
def _deleteObject(self, bucket_name, external_id):
with self.rate_limiter:
self.log.debug(f"Deleting object {external_id}")
self.log.debug("Deleting object %s", external_id)
self.s3.Object(bucket_name, external_id).delete()

View File

@ -76,6 +76,7 @@ class AwsResource(statemachine.Resource):
TYPE_VOLUME = 'volume'
TYPE_OBJECT = 'object'
def __init__(self, metadata, type, id):
def __init__(self, metadata, type, id, bucket_name=None):
super().__init__(metadata, type)
self.id = id
self.bucket_name = bucket_name

View File

@ -236,19 +236,6 @@ class AwsProvider(BaseProvider, subclass_id='aws'):
log = logging.getLogger("zuul.AwsProvider")
schema = AwsProviderSchema().getProviderSchema()
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
# In listResources, we reconcile AMIs which appear to be
# imports but have no nodepool tags, however it's possible
# that these aren't nodepool images. If we determine that's
# the case, we'll add their ids here so we don't waste our
# time on that again. We do not need to serialize these;
# these are ephemeral caches.
self._set(
not_our_images=set(),
not_our_snapshots=set(),
)
@property
def endpoint(self):
ep = getattr(self, '_endpoint', None)
@ -305,19 +292,6 @@ class AwsProvider(BaseProvider, subclass_id='aws'):
def listInstances(self):
return self.endpoint.listInstances()
def listResources(self):
bucket_name = self.object_storage.get('bucket-name')
self.endpoint._tagSnapshots(
self.tenant_scoped_name, self.not_our_snapshots)
self.endpoint._tagAmis(
self.tenant_scoped_name, self.not_our_images)
return self.endpoint.listResources(bucket_name)
def deleteResource(self, resource):
bucket_name = self.object_storage.get('bucket-name')
self.endpoint.deleteResource(resource, bucket_name)
def getQuotaLimits(self):
# Get the instance and volume types that this provider handles
instance_types = {}

View File

@ -33,9 +33,10 @@ class OpenstackDriver(Driver, EndpointCacheMixin,
return openstackconnection.OpenstackConnection(self, name, config)
def getProvider(self, connection, tenant_name, canonical_name,
provider_config):
provider_config, system_id):
return openstackprovider.OpenstackProvider(
self, connection, tenant_name, canonical_name, provider_config)
self, connection, tenant_name, canonical_name, provider_config,
system_id)
def getProviderClass(self):
return openstackprovider.OpenstackProvider
@ -46,17 +47,19 @@ class OpenstackDriver(Driver, EndpointCacheMixin,
def getProviderNodeClass(self):
return openstackmodel.OpenstackProviderNode
def _getEndpoint(self, connection, region):
def _getEndpoint(self, connection, region, system_id):
region_str = region or ''
endpoint_id = '/'.join([
urllib.parse.quote_plus(connection.connection_name),
urllib.parse.quote_plus(region_str),
])
return self.getEndpointById(endpoint_id,
create_args=(self, connection, region))
return self.getEndpointById(
endpoint_id,
create_args=(self, connection, region, system_id))
def getEndpoint(self, provider):
return self._getEndpoint(provider.connection, provider.region)
return self._getEndpoint(provider.connection, provider.region,
provider.system_id)
def stop(self):
self.stopEndpoints()

View File

@ -357,9 +357,9 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint):
IMAGE_UPLOAD_SLEEP = 30
def __init__(self, driver, connection, region):
def __init__(self, driver, connection, region, system_id):
name = f'{connection.connection_name}-{region}'
super().__init__(driver, connection, name)
super().__init__(driver, connection, name, system_id)
self.log = logging.getLogger(f"zuul.openstack.{self.name}")
self.region = region
@ -411,7 +411,7 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint):
self.api_executor.shutdown()
self._running = False
def listResources(self):
def listResources(self, providers):
for server in self._listServers():
if server['status'].lower() == 'deleted':
continue
@ -422,9 +422,12 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint):
# automatic resource cleanup in cleanupLeakedResources because
# openstack doesn't store metadata on those objects, so we
# call internal cleanup methods here.
if self.provider.port_cleanup_interval:
self._cleanupLeakedPorts()
if self.provider.clean_floating_ips:
intervals = [p.port_cleanup_interval for p in providers
if p.port_cleanup_interval]
interval = min(intervals or [0])
if interval:
self._cleanupLeakedPorts(interval)
if any([p.floating_ip_cleanup for p in providers]):
self._cleanupFloatingIps()
def deleteResource(self, resource):
@ -920,7 +923,7 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint):
ret.append(p)
return ret
def _cleanupLeakedPorts(self):
def _cleanupLeakedPorts(self, interval):
if not self._last_port_cleanup:
self._last_port_cleanup = time.monotonic()
ports = self._listPorts(status='DOWN')
@ -930,7 +933,7 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint):
# Return if not enough time has passed between cleanup
last_check_in_secs = int(time.monotonic() - self._last_port_cleanup)
if last_check_in_secs <= self.provider.port_cleanup_interval:
if last_check_in_secs <= interval:
return
ports = self._listPorts(status='DOWN')
@ -944,15 +947,15 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint):
self._deletePort(port_id)
except Exception:
self.log.exception("Exception deleting port %s in %s:",
port_id, self.provider.name)
port_id, self.name)
else:
removed_count += 1
self.log.debug("Removed DOWN port %s in %s",
port_id, self.provider.name)
port_id, self.name)
if self._statsd and removed_count:
key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name)
self._statsd.incr(key, removed_count)
# if self._statsd and removed_count:
# key = 'nodepool.provider.%s.leaked.ports' % (self.name)
# self._statsd.incr(key, removed_count)
self._last_port_cleanup = time.monotonic()
@ -973,10 +976,10 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint):
# indicate something happened.
if type(did_clean) is bool:
did_clean = 1
if self._statsd:
key = ('nodepool.provider.%s.leaked.floatingips'
% self.provider.name)
self._statsd.incr(key, did_clean)
# if self._statsd:
# key = ('nodepool.provider.%s.leaked.floatingips'
# % self.name)
# self._statsd.incr(key, did_clean)
def getConsoleLog(self, label, external_id):
if not label.console_log:

View File

@ -167,6 +167,8 @@ class OpenstackProviderSchema(BaseProviderSchema):
openstack_provider_schema = vs.Schema({
Optional('region'): Nullable(str),
Optional('resource-limits', default=dict()): resource_limits,
Optional('floating-ip-cleanup', default=False): bool,
Optional('port-cleanup-interval', default=600): int,
})
return assemble(
@ -182,19 +184,6 @@ class OpenstackProvider(BaseProvider, subclass_id='openstack'):
log = logging.getLogger("zuul.OpenstackProvider")
schema = OpenstackProviderSchema().getProviderSchema()
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
# In listResources, we reconcile AMIs which appear to be
# imports but have no nodepool tags, however it's possible
# that these aren't nodepool images. If we determine that's
# the case, we'll add their ids here so we don't waste our
# time on that again. We do not need to serialize these;
# these are ephemeral caches.
self._set(
not_our_images=set(),
not_our_snapshots=set(),
)
@property
def endpoint(self):
ep = getattr(self, '_endpoint', None)
@ -207,7 +196,7 @@ class OpenstackProvider(BaseProvider, subclass_id='openstack'):
# We are not fully constructed yet at this point, so we need
# to peek to get the region and endpoint.
region = provider_config.get('region')
endpoint = connection.driver._getEndpoint(connection, region)
endpoint = connection.driver._getEndpoint(connection, region, None)
return OpenstackProviderImage(
image_config, provider_config,
image_format=endpoint.getImageFormat())
@ -245,14 +234,6 @@ class OpenstackProvider(BaseProvider, subclass_id='openstack'):
def listInstances(self):
return self.endpoint.listInstances()
def listResources(self):
# TODO: implement
return []
def deleteResource(self, resource):
# TODO: implement
pass
def getQuotaLimits(self):
cloud = self.endpoint.getQuotaLimits()
zuul = QuotaInformation(default=math.inf, **self.resource_limits)

View File

@ -81,6 +81,10 @@ def scores_for_label(label_cname, candidate_names):
}
def endpoint_score(endpoint):
return mmh3.hash(f"{endpoint.canonical_name}", signed=False)
class NodesetRequestError(Exception):
"""Errors that should lead to the request being declined."""
pass
@ -238,8 +242,10 @@ class EndpointUploadJob:
raise Exception(
f"Unable to find image {self.upload.canonical_name}")
# TODO: add upload id, etc
metadata = {}
metadata = {
'zuul_system_id': self.launcher.system.system_id,
'zuul_upload_uuid': self.upload.uuid,
}
image_name = f'{provider_image.name}-{self.artifact.uuid}'
external_id = provider.uploadImage(
provider_image, image_name, self.path, self.artifact.format,
@ -667,6 +673,133 @@ class NodescanWorker:
last_unready_check = time.monotonic()
class CleanupWorker:
# Delay 60 seconds between iterations
INTERVAL = 60
log = logging.getLogger("zuul.Launcher")
def __init__(self, launcher):
self.launcher = launcher
self.wake_event = threading.Event()
self.possibly_leaked_nodes = {}
self.possibly_leaked_uploads = {}
self._running = False
self.thread = None
def start(self):
self.log.debug("Starting cleanup worker thread")
self._running = True
self.thread = threading.Thread(target=self.run,
name="CleanupWorker")
self.thread.start()
def stop(self):
self.log.debug("Stopping cleanup worker")
self._running = False
self.wake_event.set()
def join(self):
self.log.debug("Joining cleanup thread")
if self.thread:
self.thread.join()
self.log.debug("Joined cleanup thread")
def run(self):
while self._running:
# Wait before performing the first cleanup
self.wake_event.wait(self.INTERVAL)
self.wake_event.clear()
try:
self._run()
except Exception:
self.log.exception("Error in cleanup worker:")
def getMatchingEndpoints(self):
all_launchers = {
c.hostname: c for c in COMPONENT_REGISTRY.registry.all("launcher")}
for endpoint in self.launcher.endpoints.values():
candidate_launchers = {
n: c for n, c in all_launchers.items()
if not c.connection_filter
or endpoint.connection.connection_name in c.connection_filter}
candidate_names = set(candidate_launchers)
launcher_scores = {endpoint_score(endpoint): n
for n in candidate_names}
sorted_scores = sorted(launcher_scores.items())
for score, launcher_name in sorted_scores:
launcher = candidate_launchers.get(launcher_name)
if not launcher:
# Launcher is no longer online
continue
if launcher.state != launcher.RUNNING:
continue
if launcher.hostname == self.launcher.component_info.hostname:
yield endpoint
break
def _run(self):
for endpoint in self.getMatchingEndpoints():
try:
self.cleanupLeakedResources(endpoint)
except Exception:
self.log.exception("Error in cleanup worker:")
def cleanupLeakedResources(self, endpoint):
newly_leaked_nodes = {}
newly_leaked_uploads = {}
# Get a list of all providers that share this endpoint. This
# is because some providers may store resources like image
# uploads in multiple per-provider locations.
providers = [
p for p in self.launcher._getProviders()
if p.getEndpoint().canonical_name == endpoint.canonical_name
]
for resource in endpoint.listResources(providers):
if (resource.metadata.get('zuul_system_id') !=
self.launcher.system.system_id):
continue
node_id = resource.metadata.get('zuul_node_uuid')
upload_id = resource.metadata.get('zuul_upload_uuid')
if node_id and self.launcher.api.getProviderNode(node_id) is None:
newly_leaked_nodes[node_id] = resource
if node_id in self.possibly_leaked_nodes:
# We've seen this twice now, so it's not a race
# condition.
try:
endpoint.deleteResource(resource)
# if self._statsd:
# key = ('nodepool.provider.%s.leaked.%s'
# % (self.provider.name,
# resource.plural_metric_name))
# self._statsd.incr(key, 1)
except Exception:
self.log.exception("Unable to delete leaked "
f"resource for node {node_id}")
if (upload_id and
self.launcher.image_upload_registry.getItem(
upload_id) is None):
newly_leaked_uploads[upload_id] = resource
if upload_id in self.possibly_leaked_uploads:
# We've seen this twice now, so it's not a race
# condition.
try:
endpoint.deleteResource(resource)
# if self._statsd:
# key = ('nodepool.provider.%s.leaked.%s'
# % (self.provider.name,
# resource.plural_metric_name))
# self._statsd.incr(key, 1)
except Exception:
self.log.exception(
"Unable to delete leaked "
f"resource for upload {upload_id}")
self.possibly_leaked_nodes = newly_leaked_nodes
self.possibly_leaked_uploads = newly_leaked_uploads
class Launcher:
log = logging.getLogger("zuul.Launcher")
# Max. time to wait for a cache to sync
@ -678,6 +811,11 @@ class Launcher:
def __init__(self, config, connections):
self._running = True
# The cleanup method requires some extra AWS mocks that are
# not enabled in most tests, so we allow the test suite to
# disable it.
# TODO: switch basic launcher tests to openstack and remove.
self._start_cleanup = True
self.config = config
self.connections = connections
self.repl = None
@ -740,7 +878,7 @@ class Launcher:
self.tenant_layout_state = LayoutStateStore(
self.zk_client, self._layoutUpdatedCallback)
self.layout_providers_store = LayoutProvidersStore(
self.zk_client, self.connections)
self.zk_client, self.connections, self.system.system_id)
self.local_layout_state = {}
self.image_build_registry = ImageBuildRegistry(
@ -768,6 +906,7 @@ class Launcher:
max_workers=10,
thread_name_prefix="UploadWorker",
)
self.cleanup_worker = CleanupWorker(self)
def _layoutUpdatedCallback(self):
self.layout_updated_event.set()
@ -1477,6 +1616,11 @@ class Launcher:
raise ProviderNodeError(
f"Unable to find {provider_name} in tenant {tenant_name}")
def _getProviders(self):
for providers in self.tenant_providers.values():
for provider in providers:
yield provider
def _hasProvider(self, node):
try:
self._getProviderForNode(node)
@ -1506,6 +1650,10 @@ class Launcher:
self.log.debug("Starting launcher thread")
self.launcher_thread.start()
if self._start_cleanup:
self.log.debug("Starting cleanup thread")
self.cleanup_worker.start()
def stop(self):
self.log.debug("Stopping launcher")
self._running = False
@ -1514,10 +1662,12 @@ class Launcher:
self.stopRepl()
self._command_running = False
self.command_socket.stop()
self.connections.stop()
self.cleanup_worker.stop()
self.cleanup_worker.join()
self.upload_executor.shutdown()
self.endpoint_upload_executor.shutdown()
self.nodescan_worker.stop()
self.connections.stop()
# Endpoints are stopped by drivers
self.log.debug("Stopped launcher")

View File

@ -118,10 +118,11 @@ class BaseProviderEndpoint(metaclass=abc.ABCMeta):
the unit of visibility of instances, VPCs, images, etc.
"""
def __init__(self, driver, connection, name):
def __init__(self, driver, connection, name, system_id):
self.driver = driver
self.connection = connection
self.name = name
self.system_id = system_id
self.start_lock = threading.Lock()
self.started = False
self.stopped = False
@ -210,7 +211,8 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin,
def __init__(self, *args):
super().__init__()
if args:
(driver, connection, tenant_name, canonical_name, config) = args
(driver, connection, tenant_name, canonical_name, config,
system_id) = args
config = config.copy()
config.pop('_source_context')
config.pop('_start_mark')
@ -223,6 +225,7 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin,
tenant_name=tenant_name,
canonical_name=canonical_name,
config=config,
system_id=system_id,
**parsed_config,
)
@ -231,7 +234,7 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin,
f"canonical_name={self.canonical_name}>")
@classmethod
def fromZK(cls, context, path, connections):
def fromZK(cls, context, path, connections, system_id):
"""Deserialize a Provider (subclass) from ZK.
To deserialize a Provider from ZK, pass the connection
@ -246,8 +249,11 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin,
extra = {'connections': connections}
obj = cls._fromRaw(raw_data, zstat, extra)
connection = connections.connections[obj.connection_name]
obj._set(connection=connection,
driver=connection.driver)
obj._set(
connection=connection,
driver=connection.driver,
system_id=system_id,
)
return obj
def getProviderSchema(self):
@ -582,6 +588,10 @@ class EndpointCacheMixin:
self.endpoints[endpoint_id] = endpoint
return endpoint
def getEndpoints(self):
with self.endpoints_lock:
return list(self.endpoints.values())
def stopEndpoints(self):
with self.endpoints_lock:
for endpoint in self.endpoints.values():

View File

@ -145,3 +145,6 @@ class Resource:
self.type = type
self.plural_metric_name = type + 's'
self.metadata = metadata
def __repr__(self):
return f'<{self.__class__.__name__} {self.type} {self.metadata}>'

View File

@ -345,7 +345,7 @@ class Scheduler(threading.Thread):
self.zk_client,
self.layout_update_event.set)
self.layout_providers_store = LayoutProvidersStore(
self.zk_client, self.connections)
self.zk_client, self.connections, self.system.system_id)
self.local_layout_state = {}
command_socket = get_default(

View File

@ -1121,7 +1121,6 @@ class ZuulWebAPI(object):
def __init__(self, zuulweb):
self.zuulweb = zuulweb
self.zk_client = zuulweb.zk_client
self.system = ZuulSystem(self.zk_client)
self.zk_nodepool = ZooKeeperNodepool(self.zk_client,
enable_node_cache=True)
self.status_caches = {}
@ -2170,7 +2169,7 @@ class ZuulWebAPI(object):
if not (node.user_data and
isinstance(node.user_data, dict) and
node.user_data.get('zuul_system') ==
self.system.system_id and
self.zuulweb.system.system_id and
node.tenant_name == tenant_name):
continue
node_data = {}
@ -2960,6 +2959,7 @@ class ZuulWeb(object):
self.monitoring_server.start()
self.component_registry = COMPONENT_REGISTRY.create(self.zk_client)
self.system = ZuulSystem(self.zk_client)
self.system_config_thread = None
self.system_config_cache_wake_event = threading.Event()
@ -2986,7 +2986,7 @@ class ZuulWeb(object):
self.tenant_providers = {}
self.layout_providers_store = LayoutProvidersStore(
self.zk_client, self.connections)
self.zk_client, self.connections, self.system.system_id)
self.image_build_registry = ImageBuildRegistry(self.zk_client)
self.image_upload_registry = ImageUploadRegistry(self.zk_client)
self.nodes_cache = LockableZKObjectCache(

View File

@ -225,9 +225,10 @@ class LayoutProvidersStore(ZooKeeperSimpleBase):
tenant_root = "/zuul/tenant"
def __init__(self, client, connections):
def __init__(self, client, connections, system_id):
super().__init__(client)
self.connections = connections
self.system_id = system_id
def get(self, context, tenant_name):
path = f"{self.tenant_root}/{tenant_name}/provider"
@ -244,7 +245,7 @@ class LayoutProvidersStore(ZooKeeperSimpleBase):
for provider_name in provider_names:
provider_path = (f"{path}/{provider_name}/config")
yield BaseProvider.fromZK(
context, provider_path, self.connections
context, provider_path, self.connections, self.system_id
)
def set(self, context, tenant_name, providers):