From e7d746c42d5c6d638fc372addbf3c6be389b42ef Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 9 Apr 2025 14:15:14 -0700 Subject: [PATCH] Add niz cleanup This adds leaked resource cleanup, using rendezvous hashing to resolve which launcher should clean each endpoint. There is no secondary locking since a collision on cleanup is not critical. This adds more testing than nodepool had for leaked fip and port cleanup in openstack. Options to configure cleanup behavior are added. Note that the options are set in the providers, but they really affect endpoints. This is an unavoidable mismatch, so we use the most generous values. This means that any openstack provider on an endpoint can enable fip/port cleanup for that endpoint. The AWS tests used different regions which caused issues with resources being partitioned. Standardize on us-east-1 since that was what most used. AWS cleanup requires a bucket name to clean up leaked image upload objects. This can be configured on a provider, and is an unavoidable mismatch for endpoint cleanups. To resolve this, we look at the bucket names for all providers for an endpoint when we perform resource cleanup. The shutdown sequence for moto is adjusted so that it happens at the end of the normal test shutdown sequence. The system_id is added to the provider and endpoint objects so that they can construct an appropriate metadata dictionary at any point. Change-Id: I373df424e7db3d5ac22cdcd2d29a243ed960201d --- tests/base.py | 1 + tests/fake_aws.py | 8 +- tests/fake_openstack.py | 23 ++ .../aws/nodepool-image-ebs-direct.yaml | 8 +- .../layouts/aws/nodepool-image-image.yaml | 8 +- .../layouts/aws/nodepool-image-snapshot.yaml | 8 +- .../fixtures/layouts/openstack/nodepool.yaml | 2 + tests/unit/test_aws_driver.py | 246 +++++++++++++++++- tests/unit/test_launcher.py | 6 +- tests/unit/test_openstack_driver.py | 62 ++++- zuul/configloader.py | 3 +- zuul/driver/aws/__init__.py | 8 +- zuul/driver/aws/awsendpoint.py | 128 +++++---- zuul/driver/aws/awsmodel.py | 3 +- zuul/driver/aws/awsprovider.py | 26 -- zuul/driver/openstack/__init__.py | 15 +- zuul/driver/openstack/openstackendpoint.py | 37 +-- zuul/driver/openstack/openstackprovider.py | 25 +- zuul/launcher/server.py | 158 ++++++++++- zuul/provider/__init__.py | 20 +- zuul/provider/statemachine.py | 3 + zuul/scheduler.py | 2 +- zuul/web/__init__.py | 6 +- zuul/zk/layout.py | 5 +- 24 files changed, 643 insertions(+), 168 deletions(-) diff --git a/tests/base.py b/tests/base.py index 0ac35232db..4a7b865528 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2551,6 +2551,7 @@ class ZuulTestCase(BaseTestCase): launcher = TestLauncher( self.config, launcher_connections) + launcher._start_cleanup = False launcher.start() return launcher diff --git a/tests/fake_aws.py b/tests/fake_aws.py index 70f3e339c8..27c1730a49 100644 --- a/tests/fake_aws.py +++ b/tests/fake_aws.py @@ -141,8 +141,8 @@ class FakeAws: def __init__(self): self.tasks = {} - self.ec2 = boto3.resource('ec2', region_name='us-west-2') - self.ec2_client = boto3.client('ec2', region_name='us-west-2') + self.ec2 = boto3.resource('ec2', region_name='us-east-1') + self.ec2_client = boto3.client('ec2', region_name='us-east-1') self.fail_import_count = 0 def import_snapshot(self, *args, **kw): @@ -165,7 +165,7 @@ class FakeAws: # Make a Volume to simulate the import finishing volume = self.ec2_client.create_volume( Size=80, - AvailabilityZone='us-west-2') + AvailabilityZone='us-east-1') snap_id = self.ec2_client.create_snapshot( VolumeId=volume['VolumeId'], )["SnapshotId"] @@ -207,7 +207,7 @@ class FakeAws: volume = self.ec2_client.create_volume( Size=80, - AvailabilityZone='us-west-2') + AvailabilityZone='us-east-1') snap_id = self.ec2_client.create_snapshot( VolumeId=volume['VolumeId'], Description=f'imported volume import-ami-{task_id}', diff --git a/tests/fake_openstack.py b/tests/fake_openstack.py index 5fde0bb8b2..16e986112b 100644 --- a/tests/fake_openstack.py +++ b/tests/fake_openstack.py @@ -76,6 +76,10 @@ class FakeOpenstackFloatingIp(FakeOpenstackObject): } +class FakeOpenstackPort(FakeOpenstackObject): + pass + + class FakeOpenstackCloud: log = logging.getLogger("zuul.FakeOpenstackCloud") @@ -103,6 +107,7 @@ class FakeOpenstackCloud: name='fake-network', ) ] + self.ports = [] def _getConnection(self): return FakeOpenstackConnection(self) @@ -156,12 +161,29 @@ class FakeOpenstackConnection: def _has_floating_ips(self): return False + def delete_unattached_floating_ips(self): + for x in self.cloud.floating_ips: + if not getattr(x, 'port', None): + self.cloud.floating_ips.remove(x) + def list_flavors(self, get_extra=False): return self.cloud.flavors def list_volumes(self): return self.cloud.volumes + def list_ports(self, filters=None): + if filters and filters.get('status'): + return [p for p in self.cloud.ports + if p.status == filters['status']] + return self.cloud.ports + + def delete_port(self, port_id): + for x in self.cloud.ports: + if x.id == port_id: + self.cloud.ports.remove(x) + return + def get_network(self, name_or_id, filters=None): for x in self.cloud.networks: if x.id == name_or_id or x.name == name_or_id: @@ -198,6 +220,7 @@ class FakeOpenstackConnection: addresses=addresses, interface_ip=interface_ip, flavor=flavor, + metadata=meta, ) server = FakeOpenstackServer(**args) self.cloud.servers.append(server) diff --git a/tests/fixtures/layouts/aws/nodepool-image-ebs-direct.yaml b/tests/fixtures/layouts/aws/nodepool-image-ebs-direct.yaml index b5f7d8c4db..e623995d7f 100644 --- a/tests/fixtures/layouts/aws/nodepool-image-ebs-direct.yaml +++ b/tests/fixtures/layouts/aws/nodepool-image-ebs-direct.yaml @@ -69,13 +69,13 @@ - name: debian-local - section: - name: aws-us-west-2 + name: aws-us-east-1 parent: aws-base - region: us-west-2 + region: us-east-1 - provider: - name: aws-us-west-2-main - section: aws-us-west-2 + name: aws-us-east-1-main + section: aws-us-east-1 labels: - name: debian-local-normal key-name: zuul diff --git a/tests/fixtures/layouts/aws/nodepool-image-image.yaml b/tests/fixtures/layouts/aws/nodepool-image-image.yaml index f4e7b5f8ac..ca35721524 100644 --- a/tests/fixtures/layouts/aws/nodepool-image-image.yaml +++ b/tests/fixtures/layouts/aws/nodepool-image-image.yaml @@ -69,13 +69,13 @@ import-method: image - section: - name: aws-us-west-2 + name: aws-us-east-1 parent: aws-base - region: us-west-2 + region: us-east-1 - provider: - name: aws-us-west-2-main - section: aws-us-west-2 + name: aws-us-east-1-main + section: aws-us-east-1 labels: - name: debian-local-normal key-name: zuul diff --git a/tests/fixtures/layouts/aws/nodepool-image-snapshot.yaml b/tests/fixtures/layouts/aws/nodepool-image-snapshot.yaml index b347d50f93..c546c6eb6b 100644 --- a/tests/fixtures/layouts/aws/nodepool-image-snapshot.yaml +++ b/tests/fixtures/layouts/aws/nodepool-image-snapshot.yaml @@ -68,13 +68,13 @@ - name: debian-local - section: - name: aws-us-west-2 + name: aws-us-east-1 parent: aws-base - region: us-west-2 + region: us-east-1 - provider: - name: aws-us-west-2-main - section: aws-us-west-2 + name: aws-us-east-1-main + section: aws-us-east-1 labels: - name: debian-local-normal key-name: zuul diff --git a/tests/fixtures/layouts/openstack/nodepool.yaml b/tests/fixtures/layouts/openstack/nodepool.yaml index 9f8c9903cc..b1069140bf 100644 --- a/tests/fixtures/layouts/openstack/nodepool.yaml +++ b/tests/fixtures/layouts/openstack/nodepool.yaml @@ -105,6 +105,8 @@ launch-timeout: 600 launch-attempts: 2 key-name: zuul + floating-ip-cleanup: true + port-cleanup-interval: 1 flavors: - name: normal flavor-name: Fake Flavor diff --git a/tests/unit/test_aws_driver.py b/tests/unit/test_aws_driver.py index 0fba590188..12ac85e5ca 100644 --- a/tests/unit/test_aws_driver.py +++ b/tests/unit/test_aws_driver.py @@ -17,10 +17,12 @@ import concurrent.futures import contextlib import time from unittest import mock +import urllib.parse import fixtures from moto import mock_aws import boto3 +import botocore.exceptions from zuul.driver.aws import AwsDriver from zuul.driver.aws.awsmodel import AwsProviderNode @@ -145,9 +147,9 @@ class TestAwsDriver(BaseCloudDriverTest): super().setUp() - def tearDown(self): + def shutdown(self): + super().shutdown() self.mock_aws.stop() - super().tearDown() def _assertProviderNodeAttributes(self, pnode): super()._assertProviderNodeAttributes(pnode) @@ -289,6 +291,246 @@ class TestAwsDriver(BaseCloudDriverTest): def test_aws_diskimage_ebs_direct(self): self._test_diskimage() + @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) + def test_aws_resource_cleanup(self): + self.waitUntilSettled() + self.launcher.cleanup_worker.INTERVAL = 1 + # This tests everything except the image imports + # Start by setting up leaked resources + system_id = self.launcher.system.system_id + instance_tags = [ + {'Key': 'zuul_system_id', 'Value': system_id}, + {'Key': 'zuul_node_uuid', 'Value': '0000000042'}, + ] + + s3_tags = { + 'zuul_system_id': system_id, + 'zuul_upload_uuid': '0000000042', + } + + reservation = self.ec2_client.run_instances( + ImageId="ami-12c6146b", MinCount=1, MaxCount=1, + BlockDeviceMappings=[{ + 'DeviceName': '/dev/sda1', + 'Ebs': { + 'VolumeSize': 80, + 'DeleteOnTermination': False + } + }], + TagSpecifications=[{ + 'ResourceType': 'instance', + 'Tags': instance_tags + }, { + 'ResourceType': 'volume', + 'Tags': instance_tags + }] + ) + instance_id = reservation['Instances'][0]['InstanceId'] + + bucket = self.s3.Bucket('zuul') + bucket.put_object(Body=b'hi', + Key='testimage', + Tagging=urllib.parse.urlencode(s3_tags)) + obj = self.s3.Object('zuul', 'testimage') + # This effectively asserts the object exists + self.s3_client.get_object_tagging( + Bucket=obj.bucket_name, Key=obj.key) + + instance = self.ec2.Instance(instance_id) + self.assertEqual(instance.state['Name'], 'running') + + volume_id = list(instance.volumes.all())[0].id + volume = self.ec2.Volume(volume_id) + self.assertEqual(volume.state, 'in-use') + + self.log.debug("Start cleanup worker") + self.launcher.cleanup_worker.start() + + for _ in iterate_timeout(30, 'instance deletion'): + instance = self.ec2.Instance(instance_id) + if instance.state['Name'] == 'terminated': + break + time.sleep(1) + + for _ in iterate_timeout(30, 'volume deletion'): + volume = self.ec2.Volume(volume_id) + try: + if volume.state == 'deleted': + break + except botocore.exceptions.ClientError: + # Probably not found + break + time.sleep(1) + + for _ in iterate_timeout(30, 'object deletion'): + obj = self.s3.Object('zuul', 'testimage') + try: + self.s3_client.get_object_tagging( + Bucket=obj.bucket_name, Key=obj.key) + except self.s3_client.exceptions.NoSuchKey: + break + time.sleep(1) + + @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) + def test_aws_resource_cleanup_import_snapshot(self): + # This tests the import_snapshot path + self.waitUntilSettled() + self.launcher.cleanup_worker.INTERVAL = 1 + system_id = self.launcher.system.system_id + + # Start by setting up leaked resources + image_tags = [ + {'Key': 'zuul_system_id', 'Value': system_id}, + {'Key': 'zuul_upload_uuid', 'Value': '0000000042'}, + ] + + task = self.fake_aws.import_snapshot( + DiskContainer={ + 'Format': 'ova', + 'UserBucket': { + 'S3Bucket': 'zuul', + 'S3Key': 'testfile', + } + }, + TagSpecifications=[{ + 'ResourceType': 'import-snapshot-task', + 'Tags': image_tags, + }]) + snapshot_id = self.fake_aws.finish_import_snapshot(task) + + register_response = self.ec2_client.register_image( + Architecture='amd64', + BlockDeviceMappings=[ + { + 'DeviceName': '/dev/sda1', + 'Ebs': { + 'DeleteOnTermination': True, + 'SnapshotId': snapshot_id, + 'VolumeSize': 20, + 'VolumeType': 'gp2', + }, + }, + ], + RootDeviceName='/dev/sda1', + VirtualizationType='hvm', + Name='testimage', + ) + image_id = register_response['ImageId'] + + ami = self.ec2.Image(image_id) + new_snapshot_id = ami.block_device_mappings[0]['Ebs']['SnapshotId'] + self.fake_aws.change_snapshot_id(task, new_snapshot_id) + + # Note that the resulting image and snapshot do not have tags + # applied, so we test the automatic retagging methods in the + # adapter. + + image = self.ec2.Image(image_id) + self.assertEqual(image.state, 'available') + + snap = self.ec2.Snapshot(snapshot_id) + self.assertEqual(snap.state, 'completed') + + # Now that the leaked resources exist, start the worker and + # wait for it to clean them. + self.log.debug("Start cleanup worker") + self.launcher.cleanup_worker.start() + + for _ in iterate_timeout(30, 'ami deletion'): + image = self.ec2.Image(image_id) + try: + # If this has a value the image was not deleted + if image.state == 'available': + # Definitely not deleted yet + pass + except AttributeError: + # Per AWS API, a recently deleted image is empty and + # looking at the state raises an AttributeFailure; see + # https://github.com/boto/boto3/issues/2531. The image + # was deleted, so we continue on here + break + time.sleep(1) + + for _ in iterate_timeout(30, 'snapshot deletion'): + snap = self.ec2.Snapshot(new_snapshot_id) + try: + if snap.state == 'deleted': + break + except botocore.exceptions.ClientError: + # Probably not found + break + time.sleep(1) + + @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) + def test_aws_resource_cleanup_import_image(self): + # This tests the import_image path + self.waitUntilSettled() + self.launcher.cleanup_worker.INTERVAL = 1 + system_id = self.launcher.system.system_id + + # Start by setting up leaked resources + image_tags = [ + {'Key': 'zuul_system_id', 'Value': system_id}, + {'Key': 'zuul_upload_uuid', 'Value': '0000000042'}, + ] + + # The image import path: + task = self.fake_aws.import_image( + DiskContainers=[{ + 'Format': 'ova', + 'UserBucket': { + 'S3Bucket': 'zuul', + 'S3Key': 'testfile', + } + }], + TagSpecifications=[{ + 'ResourceType': 'import-image-task', + 'Tags': image_tags, + }]) + image_id, snapshot_id = self.fake_aws.finish_import_image(task) + + # Note that the resulting image and snapshot do not have tags + # applied, so we test the automatic retagging methods in the + # adapter. + + image = self.ec2.Image(image_id) + self.assertEqual(image.state, 'available') + + snap = self.ec2.Snapshot(snapshot_id) + self.assertEqual(snap.state, 'completed') + + # Now that the leaked resources exist, start the provider and + # wait for it to clean them. + # Now that the leaked resources exist, start the worker and + # wait for it to clean them. + self.log.debug("Start cleanup worker") + self.launcher.cleanup_worker.start() + + for _ in iterate_timeout(30, 'ami deletion'): + image = self.ec2.Image(image_id) + try: + # If this has a value the image was not deleted + if image.state == 'available': + # Definitely not deleted yet + pass + except AttributeError: + # Per AWS API, a recently deleted image is empty and + # looking at the state raises an AttributeFailure; see + # https://github.com/boto/boto3/issues/2531. The image + # was deleted, so we continue on here + break + time.sleep(1) + + for _ in iterate_timeout(30, 'snapshot deletion'): + snap = self.ec2.Snapshot(snapshot_id) + try: + if snap.state == 'deleted': + break + except botocore.exceptions.ClientError: + # Probably not found + break + time.sleep(1) + @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) def test_state_machines_instance(self): self._test_state_machines("debian-normal") diff --git a/tests/unit/test_launcher.py b/tests/unit/test_launcher.py index 7f5054e047..7ed4ce60e1 100644 --- a/tests/unit/test_launcher.py +++ b/tests/unit/test_launcher.py @@ -168,10 +168,8 @@ class LauncherBaseTestCase(ZuulTestCase): self.mock_aws.start() # Must start responses after mock_aws self.useFixture(ImageMocksFixture()) - self.s3 = boto3.resource('s3', region_name='us-west-2') - self.s3.create_bucket( - Bucket='zuul', - CreateBucketConfiguration={'LocationConstraint': 'us-west-2'}) + self.s3 = boto3.resource('s3', region_name='us-east-1') + self.s3.create_bucket(Bucket='zuul') self.addCleanup(self.mock_aws.stop) self.patch(zuul.driver.aws.awsendpoint, 'CACHE_TTL', 1) diff --git a/tests/unit/test_openstack_driver.py b/tests/unit/test_openstack_driver.py index 79475ced84..7c6785790c 100644 --- a/tests/unit/test_openstack_driver.py +++ b/tests/unit/test_openstack_driver.py @@ -13,6 +13,7 @@ # under the License. import os +import time import fixtures @@ -21,14 +22,17 @@ import zuul.driver.openstack.openstackendpoint from tests.fake_openstack import ( FakeOpenstackCloud, + FakeOpenstackFloatingIp, + FakeOpenstackPort, FakeOpenstackProviderEndpoint, ) from tests.base import ( FIXTURE_DIR, ZuulTestCase, - simple_layout, - return_data, driver_config, + iterate_timeout, + return_data, + simple_layout, ) from tests.unit.test_launcher import ImageMocksFixture from tests.unit.test_cloud_driver import BaseCloudDriverTest @@ -88,9 +92,6 @@ class BaseOpenstackDriverTest(ZuulTestCase): 'CACHE_TTL', 1) super().setUp() - def tearDown(self): - super().tearDown() - class TestOpenstackDriver(BaseOpenstackDriverTest, BaseCloudDriverTest): def _assertProviderNodeAttributes(self, pnode): @@ -133,8 +134,57 @@ class TestOpenstackDriver(BaseOpenstackDriverTest, BaseCloudDriverTest): def test_openstack_diskimage(self): self._test_diskimage() + # Openstack-driver specific tests + + @simple_layout('layouts/openstack/nodepool.yaml', enable_nodepool=True) + def test_openstack_resource_cleanup(self): + self.waitUntilSettled() + self.launcher.cleanup_worker.INTERVAL = 1 + conn = self.fake_cloud._getConnection() + system_id = self.launcher.system.system_id + tags = { + 'zuul_system_id': system_id, + 'zuul_node_uuid': '0000000042', + } + conn.create_server( + name="test", + meta=tags, + ) + self.assertEqual(1, len(conn.list_servers())) + + fip = FakeOpenstackFloatingIp( + id='42', + floating_ip_address='fake', + status='ACTIVE', + ) + conn.cloud.floating_ips.append(fip) + self.assertEqual(1, len(conn.list_floating_ips())) + + port = FakeOpenstackPort( + id='43', + status='DOWN', + device_owner='compute:foo', + ) + conn.cloud.ports.append(port) + self.assertEqual(1, len(conn.list_ports())) + + self.log.debug("Start cleanup worker") + self.launcher.cleanup_worker.start() + + for _ in iterate_timeout(30, 'instance deletion'): + if not conn.list_servers(): + break + time.sleep(1) + for _ in iterate_timeout(30, 'fip deletion'): + if not conn.list_floating_ips(): + break + time.sleep(1) + for _ in iterate_timeout(30, 'port deletion'): + if not conn.list_ports(): + break + time.sleep(1) + -# Openstack-driver specific tests class TestOpenstackDriverFloatingIp(BaseOpenstackDriverTest, BaseCloudDriverTest): # This test is for nova-net clouds with floating ips that require diff --git a/zuul/configloader.py b/zuul/configloader.py index 2b54b884b0..63c0e6c079 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -2914,7 +2914,8 @@ class TenantParser(object): provider = connection.driver.getProvider( connection, tenant.name, provider_config.canonical_name, - flat_config) + flat_config, + parse_context.scheduler.system.system_id) shadow_layout.addProvider(provider) for e in parsed_config.queue_errors: diff --git a/zuul/driver/aws/__init__.py b/zuul/driver/aws/__init__.py index b302e66cda..366f7be7f9 100644 --- a/zuul/driver/aws/__init__.py +++ b/zuul/driver/aws/__init__.py @@ -28,9 +28,10 @@ class AwsDriver(Driver, EndpointCacheMixin, return awsconnection.AwsConnection(self, name, config) def getProvider(self, connection, tenant_name, canonical_name, - provider_config): + provider_config, system_id): return awsprovider.AwsProvider( - self, connection, tenant_name, canonical_name, provider_config) + self, connection, tenant_name, canonical_name, provider_config, + system_id) def getProviderClass(self): return awsprovider.AwsProvider @@ -52,7 +53,8 @@ class AwsDriver(Driver, EndpointCacheMixin, ]) return self.getEndpointById( endpoint_id, - create_args=(self, provider.connection, provider.region)) + create_args=(self, provider.connection, provider.region, + provider.system_id)) def stop(self): self.stopEndpoints() diff --git a/zuul/driver/aws/awsendpoint.py b/zuul/driver/aws/awsendpoint.py index 7df530cbe8..a12069b788 100644 --- a/zuul/driver/aws/awsendpoint.py +++ b/zuul/driver/aws/awsendpoint.py @@ -363,9 +363,9 @@ class AwsProviderEndpoint(BaseProviderEndpoint): IMAGE_UPLOAD_SLEEP = 30 LAUNCH_TEMPLATE_PREFIX = 'zuul-launch-template' - def __init__(self, driver, connection, region): + def __init__(self, driver, connection, region, system_id): name = f'{connection.connection_name}-{region}' - super().__init__(driver, connection, name) + super().__init__(driver, connection, name, system_id) self.log = logging.getLogger(f"zuul.aws.{self.name}") self.region = region @@ -409,6 +409,13 @@ class AwsProviderEndpoint(BaseProviderEndpoint): self.aws_quotas = self.aws.client("service-quotas") self.ebs_client = self.aws.client('ebs') self.provider_label_template_names = {} + # In listResources, we reconcile AMIs which appear to be + # imports but have no nodepool tags, however it's possible + # that these aren't nodepool images. If we determine that's + # the case, we'll add their ids here so we don't waste our + # time on that again. + self.not_our_images = set() + self.not_our_snapshots = set() def startEndpoint(self): self._running = True @@ -496,7 +503,14 @@ class AwsProviderEndpoint(BaseProviderEndpoint): def postConfig(self, provider): self._createLaunchTemplates(provider) - def listResources(self, bucket_name): + def listResources(self, providers): + bucket_names = set() + for provider in providers: + if bn := provider.object_storage.get('bucket-name'): + bucket_names.add(bn) + self._tagSnapshots() + self._tagAmis() + for host in self._listHosts(): try: if host['State'].lower() in [ @@ -540,7 +554,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): continue yield AwsResource(tag_list_to_dict(snap.get('Tags')), AwsResource.TYPE_SNAPSHOT, snap['SnapshotId']) - if bucket_name: + for bucket_name in bucket_names: for obj in self._listObjects(bucket_name): with self.non_mutating_rate_limiter: try: @@ -549,10 +563,10 @@ class AwsProviderEndpoint(BaseProviderEndpoint): except botocore.exceptions.ClientError: continue yield AwsResource(tag_list_to_dict(tags['TagSet']), - AwsResource.TYPE_OBJECT, obj.key) + AwsResource.TYPE_OBJECT, obj.key, + bucket_name=bucket_name) - def deleteResource(self, resource, bucket_name): - self.deleteResource(resource, bucket_name) + def deleteResource(self, resource): self.log.info(f"Deleting leaked {resource.type}: {resource.id}") if resource.type == AwsResource.TYPE_HOST: self._releaseHost(resource.id, immediate=True) @@ -565,7 +579,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if resource.type == AwsResource.TYPE_SNAPSHOT: self._deleteSnapshot(resource.id) if resource.type == AwsResource.TYPE_OBJECT: - self._deleteObject(bucket_name, resource.id) + self._deleteObject(resource.bucket_name, resource.id) def listInstances(self): volumes = {} @@ -901,14 +915,14 @@ class AwsProviderEndpoint(BaseProviderEndpoint): # Local implementation below - def _tagAmis(self, provider_name, not_our_images): + def _tagAmis(self): # There is no way to tag imported AMIs, so this routine # "eventually" tags them. We look for any AMIs without tags # and we copy the tags from the associated snapshot or image # import task. to_examine = [] for ami in self._listAmis(): - if ami['ImageId'] in not_our_images: + if ami['ImageId'] in self.not_our_images: continue if ami.get('Tags'): continue @@ -921,7 +935,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): # This was an import image (not snapshot) so let's # try to find tags from the import task. tags = tag_list_to_dict(task.get('Tags')) - if (tags.get('zuul_provider_name') == provider_name): + if (tags.get('zuul_system_id') == self.system_id): # Copy over tags self.log.debug( "Copying tags from import task %s to AMI", @@ -936,16 +950,16 @@ class AwsProviderEndpoint(BaseProviderEndpoint): # any tags from the snapshot import task, otherwise, mark # it as an image we can ignore in future runs. if len(ami.get('BlockDeviceMappings', [])) < 1: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) continue bdm = ami['BlockDeviceMappings'][0] ebs = bdm.get('Ebs') if not ebs: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) continue snapshot_id = ebs.get('SnapshotId') if not snapshot_id: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) continue to_examine.append((ami, snapshot_id)) if not to_examine: @@ -963,12 +977,11 @@ class AwsProviderEndpoint(BaseProviderEndpoint): task_map[task_snapshot_id] = task['Tags'] for ami, snapshot_id in to_examine: - tags = task_map.get(snapshot_id) + tags = tag_list_to_dict(task_map.get(snapshot_id)) if not tags: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) continue - metadata = tag_list_to_dict(tags) - if (metadata.get('zuul_provider_name') == provider_name): + if (tags.get('zuul_system_id') == self.system_id): # Copy over tags self.log.debug( "Copying tags from import task to image %s", @@ -976,15 +989,15 @@ class AwsProviderEndpoint(BaseProviderEndpoint): with self.rate_limiter: self.ec2_client.create_tags( Resources=[ami['ImageId']], - Tags=task['Tags']) + Tags=task_map.get(snapshot_id)) else: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) - def _tagSnapshots(self, provider_name, not_our_snapshots): + def _tagSnapshots(self): # See comments for _tagAmis to_examine = [] for snap in self._listSnapshots(): - if snap['SnapshotId'] in not_our_snapshots: + if snap['SnapshotId'] in self.not_our_snapshots: continue try: if snap.get('Tags'): @@ -1004,7 +1017,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): # This was an import image (not snapshot) so let's # try to find tags from the import task. tags = tag_list_to_dict(task.get('Tags')) - if (tags.get('zuul_provider_name') == provider_name): + if (tags.get('zuul_system_id') == self.system_id): # Copy over tags self.log.debug( f"Copying tags from import task {task_id}" @@ -1034,12 +1047,11 @@ class AwsProviderEndpoint(BaseProviderEndpoint): task_map[task_snapshot_id] = task['Tags'] for snap in to_examine: - tags = task_map.get(snap['SnapshotId']) + tags = tag_list_to_dict(task_map.get(snap['SnapshotId'])) if not tags: - not_our_snapshots.add(snap['SnapshotId']) + self.not_our_snapshots.add(snap['SnapshotId']) continue - metadata = tag_list_to_dict(tags) - if (metadata.get('zuul_provider_name') == provider_name): + if (tags.get('zuul_system_id') == self.system_id): # Copy over tags self.log.debug( "Copying tags from import task to snapshot %s", @@ -1047,9 +1059,9 @@ class AwsProviderEndpoint(BaseProviderEndpoint): with self.rate_limiter: self.ec2_client.create_tags( Resources=[snap['SnapshotId']], - Tags=tags) + Tags=task_map.get(snap['SnapshotId'])) else: - not_our_snapshots.add(snap['SnapshotId']) + self.not_our_snapshots.add(snap['SnapshotId']) def _getImportImageTask(self, task_id): paginator = self.ec2_client.get_paginator( @@ -1419,7 +1431,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): self.log.info("Creating launch templates") tags = { - 'zuul_managed': 'true', + 'zuul_system_id': self.system_id, 'zuul_provider_name': provider.canonical_name, } existing_templates = dict() # for clean up and avoid creation attempt @@ -1512,14 +1524,13 @@ class AwsProviderEndpoint(BaseProviderEndpoint): for template_name, template in existing_templates.items(): if template_name not in configured_templates: # check if the template was created by the current provider - tags = template.get('Tags', []) - for tag in tags: - if (tag['Key'] == 'zuul_provider_name' and - tag['Value'] == provider.canonical_name): - self.ec2_client.delete_launch_template( - LaunchTemplateName=template_name) - self.log.debug("Deleted unused launch template: %s", - template_name) + tags = tag_list_to_dict(template.get('Tags', [])) + if (tags.get('zuul_system_id') == self.system_id and + tags.get('zuul_provider_name') == provider.canonical_name): + self.ec2_client.delete_launch_template( + LaunchTemplateName=template_name) + self.log.debug("Deleted unused launch template: %s", + template_name) def _getLaunchTemplateName(self, args): hasher = hashlib.sha256() @@ -1793,7 +1804,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): ids = [] for (del_id, log) in records: ids.append(del_id) - log.debug(f"Deleting instance {del_id}") + log.debug("Deleting instance %s", del_id) count = len(ids) with self.rate_limiter(log.debug, f"Deleted {count} instances"): self.ec2_client.terminate_instances(InstanceIds=ids) @@ -1805,7 +1816,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): ids = [] for (del_id, log) in records: ids.append(del_id) - log.debug(f"Releasing host {del_id}") + log.debug("Releasing host %s", del_id) count = len(ids) with self.rate_limiter(log.debug, f"Released {count} hosts"): self.ec2_client.release_hosts(HostIds=ids) @@ -1817,7 +1828,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if host['HostId'] == external_id: break else: - log.warning(f"Host not found when releasing {external_id}") + log.warning("Host not found when releasing %s", external_id) return None if immediate: with self.rate_limiter(log.debug, "Released host"): @@ -1835,7 +1846,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if instance['InstanceId'] == external_id: break else: - log.warning(f"Instance not found when deleting {external_id}") + log.warning("Instance not found when deleting %s", external_id) return None if immediate: with self.rate_limiter(log.debug, "Deleted instance"): @@ -1851,11 +1862,17 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if volume['VolumeId'] == external_id: break else: - self.log.warning(f"Volume not found when deleting {external_id}") + self.log.warning("Volume not found when deleting %s", external_id) return None with self.rate_limiter(self.log.debug, "Deleted volume"): self.log.debug(f"Deleting volume {external_id}") - self.ec2_client.delete_volume(VolumeId=volume['VolumeId']) + try: + self.ec2_client.delete_volume(VolumeId=volume['VolumeId']) + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'NotFound': + self.log.warning( + "Volume not found when deleting %s", external_id) + return None return volume def _deleteAmi(self, external_id): @@ -1863,11 +1880,17 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if ami['ImageId'] == external_id: break else: - self.log.warning(f"AMI not found when deleting {external_id}") + self.log.warning("AMI not found when deleting %s", external_id) return None with self.rate_limiter: self.log.debug(f"Deleting AMI {external_id}") - self.ec2_client.deregister_image(ImageId=ami['ImageId']) + try: + self.ec2_client.deregister_image(ImageId=ami['ImageId']) + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'NotFound': + self.log.warning( + "AMI not found when deleting %s", external_id) + return None return ami def _deleteSnapshot(self, external_id): @@ -1875,14 +1898,21 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if snap['SnapshotId'] == external_id: break else: - self.log.warning(f"Snapshot not found when deleting {external_id}") + self.log.warning("Snapshot not found when deleting %s", + external_id) return None with self.rate_limiter: self.log.debug(f"Deleting Snapshot {external_id}") - self.ec2_client.delete_snapshot(SnapshotId=snap['SnapshotId']) + try: + self.ec2_client.delete_snapshot(SnapshotId=snap['SnapshotId']) + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'NotFound': + self.log.warning( + "Snapshot not found when deleting %s", external_id) + return None return snap def _deleteObject(self, bucket_name, external_id): with self.rate_limiter: - self.log.debug(f"Deleting object {external_id}") + self.log.debug("Deleting object %s", external_id) self.s3.Object(bucket_name, external_id).delete() diff --git a/zuul/driver/aws/awsmodel.py b/zuul/driver/aws/awsmodel.py index ef3fc5efea..07832bbf9d 100644 --- a/zuul/driver/aws/awsmodel.py +++ b/zuul/driver/aws/awsmodel.py @@ -76,6 +76,7 @@ class AwsResource(statemachine.Resource): TYPE_VOLUME = 'volume' TYPE_OBJECT = 'object' - def __init__(self, metadata, type, id): + def __init__(self, metadata, type, id, bucket_name=None): super().__init__(metadata, type) self.id = id + self.bucket_name = bucket_name diff --git a/zuul/driver/aws/awsprovider.py b/zuul/driver/aws/awsprovider.py index 939ab5d557..327b85349d 100644 --- a/zuul/driver/aws/awsprovider.py +++ b/zuul/driver/aws/awsprovider.py @@ -236,19 +236,6 @@ class AwsProvider(BaseProvider, subclass_id='aws'): log = logging.getLogger("zuul.AwsProvider") schema = AwsProviderSchema().getProviderSchema() - def __init__(self, *args, **kw): - super().__init__(*args, **kw) - # In listResources, we reconcile AMIs which appear to be - # imports but have no nodepool tags, however it's possible - # that these aren't nodepool images. If we determine that's - # the case, we'll add their ids here so we don't waste our - # time on that again. We do not need to serialize these; - # these are ephemeral caches. - self._set( - not_our_images=set(), - not_our_snapshots=set(), - ) - @property def endpoint(self): ep = getattr(self, '_endpoint', None) @@ -305,19 +292,6 @@ class AwsProvider(BaseProvider, subclass_id='aws'): def listInstances(self): return self.endpoint.listInstances() - def listResources(self): - bucket_name = self.object_storage.get('bucket-name') - - self.endpoint._tagSnapshots( - self.tenant_scoped_name, self.not_our_snapshots) - self.endpoint._tagAmis( - self.tenant_scoped_name, self.not_our_images) - return self.endpoint.listResources(bucket_name) - - def deleteResource(self, resource): - bucket_name = self.object_storage.get('bucket-name') - self.endpoint.deleteResource(resource, bucket_name) - def getQuotaLimits(self): # Get the instance and volume types that this provider handles instance_types = {} diff --git a/zuul/driver/openstack/__init__.py b/zuul/driver/openstack/__init__.py index b6be116982..8849725dc7 100644 --- a/zuul/driver/openstack/__init__.py +++ b/zuul/driver/openstack/__init__.py @@ -33,9 +33,10 @@ class OpenstackDriver(Driver, EndpointCacheMixin, return openstackconnection.OpenstackConnection(self, name, config) def getProvider(self, connection, tenant_name, canonical_name, - provider_config): + provider_config, system_id): return openstackprovider.OpenstackProvider( - self, connection, tenant_name, canonical_name, provider_config) + self, connection, tenant_name, canonical_name, provider_config, + system_id) def getProviderClass(self): return openstackprovider.OpenstackProvider @@ -46,17 +47,19 @@ class OpenstackDriver(Driver, EndpointCacheMixin, def getProviderNodeClass(self): return openstackmodel.OpenstackProviderNode - def _getEndpoint(self, connection, region): + def _getEndpoint(self, connection, region, system_id): region_str = region or '' endpoint_id = '/'.join([ urllib.parse.quote_plus(connection.connection_name), urllib.parse.quote_plus(region_str), ]) - return self.getEndpointById(endpoint_id, - create_args=(self, connection, region)) + return self.getEndpointById( + endpoint_id, + create_args=(self, connection, region, system_id)) def getEndpoint(self, provider): - return self._getEndpoint(provider.connection, provider.region) + return self._getEndpoint(provider.connection, provider.region, + provider.system_id) def stop(self): self.stopEndpoints() diff --git a/zuul/driver/openstack/openstackendpoint.py b/zuul/driver/openstack/openstackendpoint.py index 43ea81aaf1..4c749a2f18 100644 --- a/zuul/driver/openstack/openstackendpoint.py +++ b/zuul/driver/openstack/openstackendpoint.py @@ -357,9 +357,9 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): IMAGE_UPLOAD_SLEEP = 30 - def __init__(self, driver, connection, region): + def __init__(self, driver, connection, region, system_id): name = f'{connection.connection_name}-{region}' - super().__init__(driver, connection, name) + super().__init__(driver, connection, name, system_id) self.log = logging.getLogger(f"zuul.openstack.{self.name}") self.region = region @@ -411,7 +411,7 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): self.api_executor.shutdown() self._running = False - def listResources(self): + def listResources(self, providers): for server in self._listServers(): if server['status'].lower() == 'deleted': continue @@ -422,9 +422,12 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): # automatic resource cleanup in cleanupLeakedResources because # openstack doesn't store metadata on those objects, so we # call internal cleanup methods here. - if self.provider.port_cleanup_interval: - self._cleanupLeakedPorts() - if self.provider.clean_floating_ips: + intervals = [p.port_cleanup_interval for p in providers + if p.port_cleanup_interval] + interval = min(intervals or [0]) + if interval: + self._cleanupLeakedPorts(interval) + if any([p.floating_ip_cleanup for p in providers]): self._cleanupFloatingIps() def deleteResource(self, resource): @@ -920,7 +923,7 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): ret.append(p) return ret - def _cleanupLeakedPorts(self): + def _cleanupLeakedPorts(self, interval): if not self._last_port_cleanup: self._last_port_cleanup = time.monotonic() ports = self._listPorts(status='DOWN') @@ -930,7 +933,7 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): # Return if not enough time has passed between cleanup last_check_in_secs = int(time.monotonic() - self._last_port_cleanup) - if last_check_in_secs <= self.provider.port_cleanup_interval: + if last_check_in_secs <= interval: return ports = self._listPorts(status='DOWN') @@ -944,15 +947,15 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): self._deletePort(port_id) except Exception: self.log.exception("Exception deleting port %s in %s:", - port_id, self.provider.name) + port_id, self.name) else: removed_count += 1 self.log.debug("Removed DOWN port %s in %s", - port_id, self.provider.name) + port_id, self.name) - if self._statsd and removed_count: - key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name) - self._statsd.incr(key, removed_count) + # if self._statsd and removed_count: + # key = 'nodepool.provider.%s.leaked.ports' % (self.name) + # self._statsd.incr(key, removed_count) self._last_port_cleanup = time.monotonic() @@ -973,10 +976,10 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): # indicate something happened. if type(did_clean) is bool: did_clean = 1 - if self._statsd: - key = ('nodepool.provider.%s.leaked.floatingips' - % self.provider.name) - self._statsd.incr(key, did_clean) + # if self._statsd: + # key = ('nodepool.provider.%s.leaked.floatingips' + # % self.name) + # self._statsd.incr(key, did_clean) def getConsoleLog(self, label, external_id): if not label.console_log: diff --git a/zuul/driver/openstack/openstackprovider.py b/zuul/driver/openstack/openstackprovider.py index bd9173b6aa..4f2ca0e0da 100644 --- a/zuul/driver/openstack/openstackprovider.py +++ b/zuul/driver/openstack/openstackprovider.py @@ -167,6 +167,8 @@ class OpenstackProviderSchema(BaseProviderSchema): openstack_provider_schema = vs.Schema({ Optional('region'): Nullable(str), Optional('resource-limits', default=dict()): resource_limits, + Optional('floating-ip-cleanup', default=False): bool, + Optional('port-cleanup-interval', default=600): int, }) return assemble( @@ -182,19 +184,6 @@ class OpenstackProvider(BaseProvider, subclass_id='openstack'): log = logging.getLogger("zuul.OpenstackProvider") schema = OpenstackProviderSchema().getProviderSchema() - def __init__(self, *args, **kw): - super().__init__(*args, **kw) - # In listResources, we reconcile AMIs which appear to be - # imports but have no nodepool tags, however it's possible - # that these aren't nodepool images. If we determine that's - # the case, we'll add their ids here so we don't waste our - # time on that again. We do not need to serialize these; - # these are ephemeral caches. - self._set( - not_our_images=set(), - not_our_snapshots=set(), - ) - @property def endpoint(self): ep = getattr(self, '_endpoint', None) @@ -207,7 +196,7 @@ class OpenstackProvider(BaseProvider, subclass_id='openstack'): # We are not fully constructed yet at this point, so we need # to peek to get the region and endpoint. region = provider_config.get('region') - endpoint = connection.driver._getEndpoint(connection, region) + endpoint = connection.driver._getEndpoint(connection, region, None) return OpenstackProviderImage( image_config, provider_config, image_format=endpoint.getImageFormat()) @@ -245,14 +234,6 @@ class OpenstackProvider(BaseProvider, subclass_id='openstack'): def listInstances(self): return self.endpoint.listInstances() - def listResources(self): - # TODO: implement - return [] - - def deleteResource(self, resource): - # TODO: implement - pass - def getQuotaLimits(self): cloud = self.endpoint.getQuotaLimits() zuul = QuotaInformation(default=math.inf, **self.resource_limits) diff --git a/zuul/launcher/server.py b/zuul/launcher/server.py index 190d7486e9..ef1764a93c 100644 --- a/zuul/launcher/server.py +++ b/zuul/launcher/server.py @@ -81,6 +81,10 @@ def scores_for_label(label_cname, candidate_names): } +def endpoint_score(endpoint): + return mmh3.hash(f"{endpoint.canonical_name}", signed=False) + + class NodesetRequestError(Exception): """Errors that should lead to the request being declined.""" pass @@ -238,8 +242,10 @@ class EndpointUploadJob: raise Exception( f"Unable to find image {self.upload.canonical_name}") - # TODO: add upload id, etc - metadata = {} + metadata = { + 'zuul_system_id': self.launcher.system.system_id, + 'zuul_upload_uuid': self.upload.uuid, + } image_name = f'{provider_image.name}-{self.artifact.uuid}' external_id = provider.uploadImage( provider_image, image_name, self.path, self.artifact.format, @@ -667,6 +673,133 @@ class NodescanWorker: last_unready_check = time.monotonic() +class CleanupWorker: + # Delay 60 seconds between iterations + INTERVAL = 60 + log = logging.getLogger("zuul.Launcher") + + def __init__(self, launcher): + self.launcher = launcher + self.wake_event = threading.Event() + self.possibly_leaked_nodes = {} + self.possibly_leaked_uploads = {} + self._running = False + self.thread = None + + def start(self): + self.log.debug("Starting cleanup worker thread") + self._running = True + self.thread = threading.Thread(target=self.run, + name="CleanupWorker") + self.thread.start() + + def stop(self): + self.log.debug("Stopping cleanup worker") + self._running = False + self.wake_event.set() + + def join(self): + self.log.debug("Joining cleanup thread") + if self.thread: + self.thread.join() + self.log.debug("Joined cleanup thread") + + def run(self): + while self._running: + # Wait before performing the first cleanup + self.wake_event.wait(self.INTERVAL) + self.wake_event.clear() + try: + self._run() + except Exception: + self.log.exception("Error in cleanup worker:") + + def getMatchingEndpoints(self): + all_launchers = { + c.hostname: c for c in COMPONENT_REGISTRY.registry.all("launcher")} + + for endpoint in self.launcher.endpoints.values(): + candidate_launchers = { + n: c for n, c in all_launchers.items() + if not c.connection_filter + or endpoint.connection.connection_name in c.connection_filter} + candidate_names = set(candidate_launchers) + launcher_scores = {endpoint_score(endpoint): n + for n in candidate_names} + sorted_scores = sorted(launcher_scores.items()) + for score, launcher_name in sorted_scores: + launcher = candidate_launchers.get(launcher_name) + if not launcher: + # Launcher is no longer online + continue + if launcher.state != launcher.RUNNING: + continue + if launcher.hostname == self.launcher.component_info.hostname: + yield endpoint + break + + def _run(self): + for endpoint in self.getMatchingEndpoints(): + try: + self.cleanupLeakedResources(endpoint) + except Exception: + self.log.exception("Error in cleanup worker:") + + def cleanupLeakedResources(self, endpoint): + newly_leaked_nodes = {} + newly_leaked_uploads = {} + + # Get a list of all providers that share this endpoint. This + # is because some providers may store resources like image + # uploads in multiple per-provider locations. + providers = [ + p for p in self.launcher._getProviders() + if p.getEndpoint().canonical_name == endpoint.canonical_name + ] + + for resource in endpoint.listResources(providers): + if (resource.metadata.get('zuul_system_id') != + self.launcher.system.system_id): + continue + node_id = resource.metadata.get('zuul_node_uuid') + upload_id = resource.metadata.get('zuul_upload_uuid') + if node_id and self.launcher.api.getProviderNode(node_id) is None: + newly_leaked_nodes[node_id] = resource + if node_id in self.possibly_leaked_nodes: + # We've seen this twice now, so it's not a race + # condition. + try: + endpoint.deleteResource(resource) + # if self._statsd: + # key = ('nodepool.provider.%s.leaked.%s' + # % (self.provider.name, + # resource.plural_metric_name)) + # self._statsd.incr(key, 1) + except Exception: + self.log.exception("Unable to delete leaked " + f"resource for node {node_id}") + if (upload_id and + self.launcher.image_upload_registry.getItem( + upload_id) is None): + newly_leaked_uploads[upload_id] = resource + if upload_id in self.possibly_leaked_uploads: + # We've seen this twice now, so it's not a race + # condition. + try: + endpoint.deleteResource(resource) + # if self._statsd: + # key = ('nodepool.provider.%s.leaked.%s' + # % (self.provider.name, + # resource.plural_metric_name)) + # self._statsd.incr(key, 1) + except Exception: + self.log.exception( + "Unable to delete leaked " + f"resource for upload {upload_id}") + self.possibly_leaked_nodes = newly_leaked_nodes + self.possibly_leaked_uploads = newly_leaked_uploads + + class Launcher: log = logging.getLogger("zuul.Launcher") # Max. time to wait for a cache to sync @@ -678,6 +811,11 @@ class Launcher: def __init__(self, config, connections): self._running = True + # The cleanup method requires some extra AWS mocks that are + # not enabled in most tests, so we allow the test suite to + # disable it. + # TODO: switch basic launcher tests to openstack and remove. + self._start_cleanup = True self.config = config self.connections = connections self.repl = None @@ -740,7 +878,7 @@ class Launcher: self.tenant_layout_state = LayoutStateStore( self.zk_client, self._layoutUpdatedCallback) self.layout_providers_store = LayoutProvidersStore( - self.zk_client, self.connections) + self.zk_client, self.connections, self.system.system_id) self.local_layout_state = {} self.image_build_registry = ImageBuildRegistry( @@ -768,6 +906,7 @@ class Launcher: max_workers=10, thread_name_prefix="UploadWorker", ) + self.cleanup_worker = CleanupWorker(self) def _layoutUpdatedCallback(self): self.layout_updated_event.set() @@ -1477,6 +1616,11 @@ class Launcher: raise ProviderNodeError( f"Unable to find {provider_name} in tenant {tenant_name}") + def _getProviders(self): + for providers in self.tenant_providers.values(): + for provider in providers: + yield provider + def _hasProvider(self, node): try: self._getProviderForNode(node) @@ -1506,6 +1650,10 @@ class Launcher: self.log.debug("Starting launcher thread") self.launcher_thread.start() + if self._start_cleanup: + self.log.debug("Starting cleanup thread") + self.cleanup_worker.start() + def stop(self): self.log.debug("Stopping launcher") self._running = False @@ -1514,10 +1662,12 @@ class Launcher: self.stopRepl() self._command_running = False self.command_socket.stop() - self.connections.stop() + self.cleanup_worker.stop() + self.cleanup_worker.join() self.upload_executor.shutdown() self.endpoint_upload_executor.shutdown() self.nodescan_worker.stop() + self.connections.stop() # Endpoints are stopped by drivers self.log.debug("Stopped launcher") diff --git a/zuul/provider/__init__.py b/zuul/provider/__init__.py index 9b59588e9d..418f633725 100644 --- a/zuul/provider/__init__.py +++ b/zuul/provider/__init__.py @@ -118,10 +118,11 @@ class BaseProviderEndpoint(metaclass=abc.ABCMeta): the unit of visibility of instances, VPCs, images, etc. """ - def __init__(self, driver, connection, name): + def __init__(self, driver, connection, name, system_id): self.driver = driver self.connection = connection self.name = name + self.system_id = system_id self.start_lock = threading.Lock() self.started = False self.stopped = False @@ -210,7 +211,8 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin, def __init__(self, *args): super().__init__() if args: - (driver, connection, tenant_name, canonical_name, config) = args + (driver, connection, tenant_name, canonical_name, config, + system_id) = args config = config.copy() config.pop('_source_context') config.pop('_start_mark') @@ -223,6 +225,7 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin, tenant_name=tenant_name, canonical_name=canonical_name, config=config, + system_id=system_id, **parsed_config, ) @@ -231,7 +234,7 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin, f"canonical_name={self.canonical_name}>") @classmethod - def fromZK(cls, context, path, connections): + def fromZK(cls, context, path, connections, system_id): """Deserialize a Provider (subclass) from ZK. To deserialize a Provider from ZK, pass the connection @@ -246,8 +249,11 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin, extra = {'connections': connections} obj = cls._fromRaw(raw_data, zstat, extra) connection = connections.connections[obj.connection_name] - obj._set(connection=connection, - driver=connection.driver) + obj._set( + connection=connection, + driver=connection.driver, + system_id=system_id, + ) return obj def getProviderSchema(self): @@ -582,6 +588,10 @@ class EndpointCacheMixin: self.endpoints[endpoint_id] = endpoint return endpoint + def getEndpoints(self): + with self.endpoints_lock: + return list(self.endpoints.values()) + def stopEndpoints(self): with self.endpoints_lock: for endpoint in self.endpoints.values(): diff --git a/zuul/provider/statemachine.py b/zuul/provider/statemachine.py index 877dac41aa..7ab1e59257 100644 --- a/zuul/provider/statemachine.py +++ b/zuul/provider/statemachine.py @@ -145,3 +145,6 @@ class Resource: self.type = type self.plural_metric_name = type + 's' self.metadata = metadata + + def __repr__(self): + return f'<{self.__class__.__name__} {self.type} {self.metadata}>' diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 40ff9829a3..498e54ae8d 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -345,7 +345,7 @@ class Scheduler(threading.Thread): self.zk_client, self.layout_update_event.set) self.layout_providers_store = LayoutProvidersStore( - self.zk_client, self.connections) + self.zk_client, self.connections, self.system.system_id) self.local_layout_state = {} command_socket = get_default( diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index bb07b55a76..ebeccdd4bc 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -1121,7 +1121,6 @@ class ZuulWebAPI(object): def __init__(self, zuulweb): self.zuulweb = zuulweb self.zk_client = zuulweb.zk_client - self.system = ZuulSystem(self.zk_client) self.zk_nodepool = ZooKeeperNodepool(self.zk_client, enable_node_cache=True) self.status_caches = {} @@ -2170,7 +2169,7 @@ class ZuulWebAPI(object): if not (node.user_data and isinstance(node.user_data, dict) and node.user_data.get('zuul_system') == - self.system.system_id and + self.zuulweb.system.system_id and node.tenant_name == tenant_name): continue node_data = {} @@ -2960,6 +2959,7 @@ class ZuulWeb(object): self.monitoring_server.start() self.component_registry = COMPONENT_REGISTRY.create(self.zk_client) + self.system = ZuulSystem(self.zk_client) self.system_config_thread = None self.system_config_cache_wake_event = threading.Event() @@ -2986,7 +2986,7 @@ class ZuulWeb(object): self.tenant_providers = {} self.layout_providers_store = LayoutProvidersStore( - self.zk_client, self.connections) + self.zk_client, self.connections, self.system.system_id) self.image_build_registry = ImageBuildRegistry(self.zk_client) self.image_upload_registry = ImageUploadRegistry(self.zk_client) self.nodes_cache = LockableZKObjectCache( diff --git a/zuul/zk/layout.py b/zuul/zk/layout.py index 635f991b99..3220685f37 100644 --- a/zuul/zk/layout.py +++ b/zuul/zk/layout.py @@ -225,9 +225,10 @@ class LayoutProvidersStore(ZooKeeperSimpleBase): tenant_root = "/zuul/tenant" - def __init__(self, client, connections): + def __init__(self, client, connections, system_id): super().__init__(client) self.connections = connections + self.system_id = system_id def get(self, context, tenant_name): path = f"{self.tenant_root}/{tenant_name}/provider" @@ -244,7 +245,7 @@ class LayoutProvidersStore(ZooKeeperSimpleBase): for provider_name in provider_names: provider_path = (f"{path}/{provider_name}/config") yield BaseProvider.fromZK( - context, provider_path, self.connections + context, provider_path, self.connections, self.system_id ) def set(self, context, tenant_name, providers):