diff --git a/tests/base.py b/tests/base.py index 0ac35232db..4a7b865528 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2551,6 +2551,7 @@ class ZuulTestCase(BaseTestCase): launcher = TestLauncher( self.config, launcher_connections) + launcher._start_cleanup = False launcher.start() return launcher diff --git a/tests/fake_aws.py b/tests/fake_aws.py index 70f3e339c8..27c1730a49 100644 --- a/tests/fake_aws.py +++ b/tests/fake_aws.py @@ -141,8 +141,8 @@ class FakeAws: def __init__(self): self.tasks = {} - self.ec2 = boto3.resource('ec2', region_name='us-west-2') - self.ec2_client = boto3.client('ec2', region_name='us-west-2') + self.ec2 = boto3.resource('ec2', region_name='us-east-1') + self.ec2_client = boto3.client('ec2', region_name='us-east-1') self.fail_import_count = 0 def import_snapshot(self, *args, **kw): @@ -165,7 +165,7 @@ class FakeAws: # Make a Volume to simulate the import finishing volume = self.ec2_client.create_volume( Size=80, - AvailabilityZone='us-west-2') + AvailabilityZone='us-east-1') snap_id = self.ec2_client.create_snapshot( VolumeId=volume['VolumeId'], )["SnapshotId"] @@ -207,7 +207,7 @@ class FakeAws: volume = self.ec2_client.create_volume( Size=80, - AvailabilityZone='us-west-2') + AvailabilityZone='us-east-1') snap_id = self.ec2_client.create_snapshot( VolumeId=volume['VolumeId'], Description=f'imported volume import-ami-{task_id}', diff --git a/tests/fake_openstack.py b/tests/fake_openstack.py index 5fde0bb8b2..16e986112b 100644 --- a/tests/fake_openstack.py +++ b/tests/fake_openstack.py @@ -76,6 +76,10 @@ class FakeOpenstackFloatingIp(FakeOpenstackObject): } +class FakeOpenstackPort(FakeOpenstackObject): + pass + + class FakeOpenstackCloud: log = logging.getLogger("zuul.FakeOpenstackCloud") @@ -103,6 +107,7 @@ class FakeOpenstackCloud: name='fake-network', ) ] + self.ports = [] def _getConnection(self): return FakeOpenstackConnection(self) @@ -156,12 +161,29 @@ class FakeOpenstackConnection: def _has_floating_ips(self): return False + def delete_unattached_floating_ips(self): + for x in self.cloud.floating_ips: + if not getattr(x, 'port', None): + self.cloud.floating_ips.remove(x) + def list_flavors(self, get_extra=False): return self.cloud.flavors def list_volumes(self): return self.cloud.volumes + def list_ports(self, filters=None): + if filters and filters.get('status'): + return [p for p in self.cloud.ports + if p.status == filters['status']] + return self.cloud.ports + + def delete_port(self, port_id): + for x in self.cloud.ports: + if x.id == port_id: + self.cloud.ports.remove(x) + return + def get_network(self, name_or_id, filters=None): for x in self.cloud.networks: if x.id == name_or_id or x.name == name_or_id: @@ -198,6 +220,7 @@ class FakeOpenstackConnection: addresses=addresses, interface_ip=interface_ip, flavor=flavor, + metadata=meta, ) server = FakeOpenstackServer(**args) self.cloud.servers.append(server) diff --git a/tests/fixtures/layouts/aws/nodepool-image-ebs-direct.yaml b/tests/fixtures/layouts/aws/nodepool-image-ebs-direct.yaml index b5f7d8c4db..e623995d7f 100644 --- a/tests/fixtures/layouts/aws/nodepool-image-ebs-direct.yaml +++ b/tests/fixtures/layouts/aws/nodepool-image-ebs-direct.yaml @@ -69,13 +69,13 @@ - name: debian-local - section: - name: aws-us-west-2 + name: aws-us-east-1 parent: aws-base - region: us-west-2 + region: us-east-1 - provider: - name: aws-us-west-2-main - section: aws-us-west-2 + name: aws-us-east-1-main + section: aws-us-east-1 labels: - name: debian-local-normal key-name: zuul diff --git a/tests/fixtures/layouts/aws/nodepool-image-image.yaml b/tests/fixtures/layouts/aws/nodepool-image-image.yaml index f4e7b5f8ac..ca35721524 100644 --- a/tests/fixtures/layouts/aws/nodepool-image-image.yaml +++ b/tests/fixtures/layouts/aws/nodepool-image-image.yaml @@ -69,13 +69,13 @@ import-method: image - section: - name: aws-us-west-2 + name: aws-us-east-1 parent: aws-base - region: us-west-2 + region: us-east-1 - provider: - name: aws-us-west-2-main - section: aws-us-west-2 + name: aws-us-east-1-main + section: aws-us-east-1 labels: - name: debian-local-normal key-name: zuul diff --git a/tests/fixtures/layouts/aws/nodepool-image-snapshot.yaml b/tests/fixtures/layouts/aws/nodepool-image-snapshot.yaml index b347d50f93..c546c6eb6b 100644 --- a/tests/fixtures/layouts/aws/nodepool-image-snapshot.yaml +++ b/tests/fixtures/layouts/aws/nodepool-image-snapshot.yaml @@ -68,13 +68,13 @@ - name: debian-local - section: - name: aws-us-west-2 + name: aws-us-east-1 parent: aws-base - region: us-west-2 + region: us-east-1 - provider: - name: aws-us-west-2-main - section: aws-us-west-2 + name: aws-us-east-1-main + section: aws-us-east-1 labels: - name: debian-local-normal key-name: zuul diff --git a/tests/fixtures/layouts/openstack/nodepool.yaml b/tests/fixtures/layouts/openstack/nodepool.yaml index 9f8c9903cc..b1069140bf 100644 --- a/tests/fixtures/layouts/openstack/nodepool.yaml +++ b/tests/fixtures/layouts/openstack/nodepool.yaml @@ -105,6 +105,8 @@ launch-timeout: 600 launch-attempts: 2 key-name: zuul + floating-ip-cleanup: true + port-cleanup-interval: 1 flavors: - name: normal flavor-name: Fake Flavor diff --git a/tests/unit/test_aws_driver.py b/tests/unit/test_aws_driver.py index 0fba590188..12ac85e5ca 100644 --- a/tests/unit/test_aws_driver.py +++ b/tests/unit/test_aws_driver.py @@ -17,10 +17,12 @@ import concurrent.futures import contextlib import time from unittest import mock +import urllib.parse import fixtures from moto import mock_aws import boto3 +import botocore.exceptions from zuul.driver.aws import AwsDriver from zuul.driver.aws.awsmodel import AwsProviderNode @@ -145,9 +147,9 @@ class TestAwsDriver(BaseCloudDriverTest): super().setUp() - def tearDown(self): + def shutdown(self): + super().shutdown() self.mock_aws.stop() - super().tearDown() def _assertProviderNodeAttributes(self, pnode): super()._assertProviderNodeAttributes(pnode) @@ -289,6 +291,246 @@ class TestAwsDriver(BaseCloudDriverTest): def test_aws_diskimage_ebs_direct(self): self._test_diskimage() + @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) + def test_aws_resource_cleanup(self): + self.waitUntilSettled() + self.launcher.cleanup_worker.INTERVAL = 1 + # This tests everything except the image imports + # Start by setting up leaked resources + system_id = self.launcher.system.system_id + instance_tags = [ + {'Key': 'zuul_system_id', 'Value': system_id}, + {'Key': 'zuul_node_uuid', 'Value': '0000000042'}, + ] + + s3_tags = { + 'zuul_system_id': system_id, + 'zuul_upload_uuid': '0000000042', + } + + reservation = self.ec2_client.run_instances( + ImageId="ami-12c6146b", MinCount=1, MaxCount=1, + BlockDeviceMappings=[{ + 'DeviceName': '/dev/sda1', + 'Ebs': { + 'VolumeSize': 80, + 'DeleteOnTermination': False + } + }], + TagSpecifications=[{ + 'ResourceType': 'instance', + 'Tags': instance_tags + }, { + 'ResourceType': 'volume', + 'Tags': instance_tags + }] + ) + instance_id = reservation['Instances'][0]['InstanceId'] + + bucket = self.s3.Bucket('zuul') + bucket.put_object(Body=b'hi', + Key='testimage', + Tagging=urllib.parse.urlencode(s3_tags)) + obj = self.s3.Object('zuul', 'testimage') + # This effectively asserts the object exists + self.s3_client.get_object_tagging( + Bucket=obj.bucket_name, Key=obj.key) + + instance = self.ec2.Instance(instance_id) + self.assertEqual(instance.state['Name'], 'running') + + volume_id = list(instance.volumes.all())[0].id + volume = self.ec2.Volume(volume_id) + self.assertEqual(volume.state, 'in-use') + + self.log.debug("Start cleanup worker") + self.launcher.cleanup_worker.start() + + for _ in iterate_timeout(30, 'instance deletion'): + instance = self.ec2.Instance(instance_id) + if instance.state['Name'] == 'terminated': + break + time.sleep(1) + + for _ in iterate_timeout(30, 'volume deletion'): + volume = self.ec2.Volume(volume_id) + try: + if volume.state == 'deleted': + break + except botocore.exceptions.ClientError: + # Probably not found + break + time.sleep(1) + + for _ in iterate_timeout(30, 'object deletion'): + obj = self.s3.Object('zuul', 'testimage') + try: + self.s3_client.get_object_tagging( + Bucket=obj.bucket_name, Key=obj.key) + except self.s3_client.exceptions.NoSuchKey: + break + time.sleep(1) + + @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) + def test_aws_resource_cleanup_import_snapshot(self): + # This tests the import_snapshot path + self.waitUntilSettled() + self.launcher.cleanup_worker.INTERVAL = 1 + system_id = self.launcher.system.system_id + + # Start by setting up leaked resources + image_tags = [ + {'Key': 'zuul_system_id', 'Value': system_id}, + {'Key': 'zuul_upload_uuid', 'Value': '0000000042'}, + ] + + task = self.fake_aws.import_snapshot( + DiskContainer={ + 'Format': 'ova', + 'UserBucket': { + 'S3Bucket': 'zuul', + 'S3Key': 'testfile', + } + }, + TagSpecifications=[{ + 'ResourceType': 'import-snapshot-task', + 'Tags': image_tags, + }]) + snapshot_id = self.fake_aws.finish_import_snapshot(task) + + register_response = self.ec2_client.register_image( + Architecture='amd64', + BlockDeviceMappings=[ + { + 'DeviceName': '/dev/sda1', + 'Ebs': { + 'DeleteOnTermination': True, + 'SnapshotId': snapshot_id, + 'VolumeSize': 20, + 'VolumeType': 'gp2', + }, + }, + ], + RootDeviceName='/dev/sda1', + VirtualizationType='hvm', + Name='testimage', + ) + image_id = register_response['ImageId'] + + ami = self.ec2.Image(image_id) + new_snapshot_id = ami.block_device_mappings[0]['Ebs']['SnapshotId'] + self.fake_aws.change_snapshot_id(task, new_snapshot_id) + + # Note that the resulting image and snapshot do not have tags + # applied, so we test the automatic retagging methods in the + # adapter. + + image = self.ec2.Image(image_id) + self.assertEqual(image.state, 'available') + + snap = self.ec2.Snapshot(snapshot_id) + self.assertEqual(snap.state, 'completed') + + # Now that the leaked resources exist, start the worker and + # wait for it to clean them. + self.log.debug("Start cleanup worker") + self.launcher.cleanup_worker.start() + + for _ in iterate_timeout(30, 'ami deletion'): + image = self.ec2.Image(image_id) + try: + # If this has a value the image was not deleted + if image.state == 'available': + # Definitely not deleted yet + pass + except AttributeError: + # Per AWS API, a recently deleted image is empty and + # looking at the state raises an AttributeFailure; see + # https://github.com/boto/boto3/issues/2531. The image + # was deleted, so we continue on here + break + time.sleep(1) + + for _ in iterate_timeout(30, 'snapshot deletion'): + snap = self.ec2.Snapshot(new_snapshot_id) + try: + if snap.state == 'deleted': + break + except botocore.exceptions.ClientError: + # Probably not found + break + time.sleep(1) + + @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) + def test_aws_resource_cleanup_import_image(self): + # This tests the import_image path + self.waitUntilSettled() + self.launcher.cleanup_worker.INTERVAL = 1 + system_id = self.launcher.system.system_id + + # Start by setting up leaked resources + image_tags = [ + {'Key': 'zuul_system_id', 'Value': system_id}, + {'Key': 'zuul_upload_uuid', 'Value': '0000000042'}, + ] + + # The image import path: + task = self.fake_aws.import_image( + DiskContainers=[{ + 'Format': 'ova', + 'UserBucket': { + 'S3Bucket': 'zuul', + 'S3Key': 'testfile', + } + }], + TagSpecifications=[{ + 'ResourceType': 'import-image-task', + 'Tags': image_tags, + }]) + image_id, snapshot_id = self.fake_aws.finish_import_image(task) + + # Note that the resulting image and snapshot do not have tags + # applied, so we test the automatic retagging methods in the + # adapter. + + image = self.ec2.Image(image_id) + self.assertEqual(image.state, 'available') + + snap = self.ec2.Snapshot(snapshot_id) + self.assertEqual(snap.state, 'completed') + + # Now that the leaked resources exist, start the provider and + # wait for it to clean them. + # Now that the leaked resources exist, start the worker and + # wait for it to clean them. + self.log.debug("Start cleanup worker") + self.launcher.cleanup_worker.start() + + for _ in iterate_timeout(30, 'ami deletion'): + image = self.ec2.Image(image_id) + try: + # If this has a value the image was not deleted + if image.state == 'available': + # Definitely not deleted yet + pass + except AttributeError: + # Per AWS API, a recently deleted image is empty and + # looking at the state raises an AttributeFailure; see + # https://github.com/boto/boto3/issues/2531. The image + # was deleted, so we continue on here + break + time.sleep(1) + + for _ in iterate_timeout(30, 'snapshot deletion'): + snap = self.ec2.Snapshot(snapshot_id) + try: + if snap.state == 'deleted': + break + except botocore.exceptions.ClientError: + # Probably not found + break + time.sleep(1) + @simple_layout('layouts/nodepool.yaml', enable_nodepool=True) def test_state_machines_instance(self): self._test_state_machines("debian-normal") diff --git a/tests/unit/test_launcher.py b/tests/unit/test_launcher.py index 7f5054e047..7ed4ce60e1 100644 --- a/tests/unit/test_launcher.py +++ b/tests/unit/test_launcher.py @@ -168,10 +168,8 @@ class LauncherBaseTestCase(ZuulTestCase): self.mock_aws.start() # Must start responses after mock_aws self.useFixture(ImageMocksFixture()) - self.s3 = boto3.resource('s3', region_name='us-west-2') - self.s3.create_bucket( - Bucket='zuul', - CreateBucketConfiguration={'LocationConstraint': 'us-west-2'}) + self.s3 = boto3.resource('s3', region_name='us-east-1') + self.s3.create_bucket(Bucket='zuul') self.addCleanup(self.mock_aws.stop) self.patch(zuul.driver.aws.awsendpoint, 'CACHE_TTL', 1) diff --git a/tests/unit/test_openstack_driver.py b/tests/unit/test_openstack_driver.py index 79475ced84..7c6785790c 100644 --- a/tests/unit/test_openstack_driver.py +++ b/tests/unit/test_openstack_driver.py @@ -13,6 +13,7 @@ # under the License. import os +import time import fixtures @@ -21,14 +22,17 @@ import zuul.driver.openstack.openstackendpoint from tests.fake_openstack import ( FakeOpenstackCloud, + FakeOpenstackFloatingIp, + FakeOpenstackPort, FakeOpenstackProviderEndpoint, ) from tests.base import ( FIXTURE_DIR, ZuulTestCase, - simple_layout, - return_data, driver_config, + iterate_timeout, + return_data, + simple_layout, ) from tests.unit.test_launcher import ImageMocksFixture from tests.unit.test_cloud_driver import BaseCloudDriverTest @@ -88,9 +92,6 @@ class BaseOpenstackDriverTest(ZuulTestCase): 'CACHE_TTL', 1) super().setUp() - def tearDown(self): - super().tearDown() - class TestOpenstackDriver(BaseOpenstackDriverTest, BaseCloudDriverTest): def _assertProviderNodeAttributes(self, pnode): @@ -133,8 +134,57 @@ class TestOpenstackDriver(BaseOpenstackDriverTest, BaseCloudDriverTest): def test_openstack_diskimage(self): self._test_diskimage() + # Openstack-driver specific tests + + @simple_layout('layouts/openstack/nodepool.yaml', enable_nodepool=True) + def test_openstack_resource_cleanup(self): + self.waitUntilSettled() + self.launcher.cleanup_worker.INTERVAL = 1 + conn = self.fake_cloud._getConnection() + system_id = self.launcher.system.system_id + tags = { + 'zuul_system_id': system_id, + 'zuul_node_uuid': '0000000042', + } + conn.create_server( + name="test", + meta=tags, + ) + self.assertEqual(1, len(conn.list_servers())) + + fip = FakeOpenstackFloatingIp( + id='42', + floating_ip_address='fake', + status='ACTIVE', + ) + conn.cloud.floating_ips.append(fip) + self.assertEqual(1, len(conn.list_floating_ips())) + + port = FakeOpenstackPort( + id='43', + status='DOWN', + device_owner='compute:foo', + ) + conn.cloud.ports.append(port) + self.assertEqual(1, len(conn.list_ports())) + + self.log.debug("Start cleanup worker") + self.launcher.cleanup_worker.start() + + for _ in iterate_timeout(30, 'instance deletion'): + if not conn.list_servers(): + break + time.sleep(1) + for _ in iterate_timeout(30, 'fip deletion'): + if not conn.list_floating_ips(): + break + time.sleep(1) + for _ in iterate_timeout(30, 'port deletion'): + if not conn.list_ports(): + break + time.sleep(1) + -# Openstack-driver specific tests class TestOpenstackDriverFloatingIp(BaseOpenstackDriverTest, BaseCloudDriverTest): # This test is for nova-net clouds with floating ips that require diff --git a/zuul/configloader.py b/zuul/configloader.py index 2b54b884b0..63c0e6c079 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -2914,7 +2914,8 @@ class TenantParser(object): provider = connection.driver.getProvider( connection, tenant.name, provider_config.canonical_name, - flat_config) + flat_config, + parse_context.scheduler.system.system_id) shadow_layout.addProvider(provider) for e in parsed_config.queue_errors: diff --git a/zuul/driver/aws/__init__.py b/zuul/driver/aws/__init__.py index b302e66cda..366f7be7f9 100644 --- a/zuul/driver/aws/__init__.py +++ b/zuul/driver/aws/__init__.py @@ -28,9 +28,10 @@ class AwsDriver(Driver, EndpointCacheMixin, return awsconnection.AwsConnection(self, name, config) def getProvider(self, connection, tenant_name, canonical_name, - provider_config): + provider_config, system_id): return awsprovider.AwsProvider( - self, connection, tenant_name, canonical_name, provider_config) + self, connection, tenant_name, canonical_name, provider_config, + system_id) def getProviderClass(self): return awsprovider.AwsProvider @@ -52,7 +53,8 @@ class AwsDriver(Driver, EndpointCacheMixin, ]) return self.getEndpointById( endpoint_id, - create_args=(self, provider.connection, provider.region)) + create_args=(self, provider.connection, provider.region, + provider.system_id)) def stop(self): self.stopEndpoints() diff --git a/zuul/driver/aws/awsendpoint.py b/zuul/driver/aws/awsendpoint.py index 7df530cbe8..a12069b788 100644 --- a/zuul/driver/aws/awsendpoint.py +++ b/zuul/driver/aws/awsendpoint.py @@ -363,9 +363,9 @@ class AwsProviderEndpoint(BaseProviderEndpoint): IMAGE_UPLOAD_SLEEP = 30 LAUNCH_TEMPLATE_PREFIX = 'zuul-launch-template' - def __init__(self, driver, connection, region): + def __init__(self, driver, connection, region, system_id): name = f'{connection.connection_name}-{region}' - super().__init__(driver, connection, name) + super().__init__(driver, connection, name, system_id) self.log = logging.getLogger(f"zuul.aws.{self.name}") self.region = region @@ -409,6 +409,13 @@ class AwsProviderEndpoint(BaseProviderEndpoint): self.aws_quotas = self.aws.client("service-quotas") self.ebs_client = self.aws.client('ebs') self.provider_label_template_names = {} + # In listResources, we reconcile AMIs which appear to be + # imports but have no nodepool tags, however it's possible + # that these aren't nodepool images. If we determine that's + # the case, we'll add their ids here so we don't waste our + # time on that again. + self.not_our_images = set() + self.not_our_snapshots = set() def startEndpoint(self): self._running = True @@ -496,7 +503,14 @@ class AwsProviderEndpoint(BaseProviderEndpoint): def postConfig(self, provider): self._createLaunchTemplates(provider) - def listResources(self, bucket_name): + def listResources(self, providers): + bucket_names = set() + for provider in providers: + if bn := provider.object_storage.get('bucket-name'): + bucket_names.add(bn) + self._tagSnapshots() + self._tagAmis() + for host in self._listHosts(): try: if host['State'].lower() in [ @@ -540,7 +554,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): continue yield AwsResource(tag_list_to_dict(snap.get('Tags')), AwsResource.TYPE_SNAPSHOT, snap['SnapshotId']) - if bucket_name: + for bucket_name in bucket_names: for obj in self._listObjects(bucket_name): with self.non_mutating_rate_limiter: try: @@ -549,10 +563,10 @@ class AwsProviderEndpoint(BaseProviderEndpoint): except botocore.exceptions.ClientError: continue yield AwsResource(tag_list_to_dict(tags['TagSet']), - AwsResource.TYPE_OBJECT, obj.key) + AwsResource.TYPE_OBJECT, obj.key, + bucket_name=bucket_name) - def deleteResource(self, resource, bucket_name): - self.deleteResource(resource, bucket_name) + def deleteResource(self, resource): self.log.info(f"Deleting leaked {resource.type}: {resource.id}") if resource.type == AwsResource.TYPE_HOST: self._releaseHost(resource.id, immediate=True) @@ -565,7 +579,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if resource.type == AwsResource.TYPE_SNAPSHOT: self._deleteSnapshot(resource.id) if resource.type == AwsResource.TYPE_OBJECT: - self._deleteObject(bucket_name, resource.id) + self._deleteObject(resource.bucket_name, resource.id) def listInstances(self): volumes = {} @@ -901,14 +915,14 @@ class AwsProviderEndpoint(BaseProviderEndpoint): # Local implementation below - def _tagAmis(self, provider_name, not_our_images): + def _tagAmis(self): # There is no way to tag imported AMIs, so this routine # "eventually" tags them. We look for any AMIs without tags # and we copy the tags from the associated snapshot or image # import task. to_examine = [] for ami in self._listAmis(): - if ami['ImageId'] in not_our_images: + if ami['ImageId'] in self.not_our_images: continue if ami.get('Tags'): continue @@ -921,7 +935,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): # This was an import image (not snapshot) so let's # try to find tags from the import task. tags = tag_list_to_dict(task.get('Tags')) - if (tags.get('zuul_provider_name') == provider_name): + if (tags.get('zuul_system_id') == self.system_id): # Copy over tags self.log.debug( "Copying tags from import task %s to AMI", @@ -936,16 +950,16 @@ class AwsProviderEndpoint(BaseProviderEndpoint): # any tags from the snapshot import task, otherwise, mark # it as an image we can ignore in future runs. if len(ami.get('BlockDeviceMappings', [])) < 1: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) continue bdm = ami['BlockDeviceMappings'][0] ebs = bdm.get('Ebs') if not ebs: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) continue snapshot_id = ebs.get('SnapshotId') if not snapshot_id: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) continue to_examine.append((ami, snapshot_id)) if not to_examine: @@ -963,12 +977,11 @@ class AwsProviderEndpoint(BaseProviderEndpoint): task_map[task_snapshot_id] = task['Tags'] for ami, snapshot_id in to_examine: - tags = task_map.get(snapshot_id) + tags = tag_list_to_dict(task_map.get(snapshot_id)) if not tags: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) continue - metadata = tag_list_to_dict(tags) - if (metadata.get('zuul_provider_name') == provider_name): + if (tags.get('zuul_system_id') == self.system_id): # Copy over tags self.log.debug( "Copying tags from import task to image %s", @@ -976,15 +989,15 @@ class AwsProviderEndpoint(BaseProviderEndpoint): with self.rate_limiter: self.ec2_client.create_tags( Resources=[ami['ImageId']], - Tags=task['Tags']) + Tags=task_map.get(snapshot_id)) else: - not_our_images.add(ami['ImageId']) + self.not_our_images.add(ami['ImageId']) - def _tagSnapshots(self, provider_name, not_our_snapshots): + def _tagSnapshots(self): # See comments for _tagAmis to_examine = [] for snap in self._listSnapshots(): - if snap['SnapshotId'] in not_our_snapshots: + if snap['SnapshotId'] in self.not_our_snapshots: continue try: if snap.get('Tags'): @@ -1004,7 +1017,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): # This was an import image (not snapshot) so let's # try to find tags from the import task. tags = tag_list_to_dict(task.get('Tags')) - if (tags.get('zuul_provider_name') == provider_name): + if (tags.get('zuul_system_id') == self.system_id): # Copy over tags self.log.debug( f"Copying tags from import task {task_id}" @@ -1034,12 +1047,11 @@ class AwsProviderEndpoint(BaseProviderEndpoint): task_map[task_snapshot_id] = task['Tags'] for snap in to_examine: - tags = task_map.get(snap['SnapshotId']) + tags = tag_list_to_dict(task_map.get(snap['SnapshotId'])) if not tags: - not_our_snapshots.add(snap['SnapshotId']) + self.not_our_snapshots.add(snap['SnapshotId']) continue - metadata = tag_list_to_dict(tags) - if (metadata.get('zuul_provider_name') == provider_name): + if (tags.get('zuul_system_id') == self.system_id): # Copy over tags self.log.debug( "Copying tags from import task to snapshot %s", @@ -1047,9 +1059,9 @@ class AwsProviderEndpoint(BaseProviderEndpoint): with self.rate_limiter: self.ec2_client.create_tags( Resources=[snap['SnapshotId']], - Tags=tags) + Tags=task_map.get(snap['SnapshotId'])) else: - not_our_snapshots.add(snap['SnapshotId']) + self.not_our_snapshots.add(snap['SnapshotId']) def _getImportImageTask(self, task_id): paginator = self.ec2_client.get_paginator( @@ -1419,7 +1431,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): self.log.info("Creating launch templates") tags = { - 'zuul_managed': 'true', + 'zuul_system_id': self.system_id, 'zuul_provider_name': provider.canonical_name, } existing_templates = dict() # for clean up and avoid creation attempt @@ -1512,14 +1524,13 @@ class AwsProviderEndpoint(BaseProviderEndpoint): for template_name, template in existing_templates.items(): if template_name not in configured_templates: # check if the template was created by the current provider - tags = template.get('Tags', []) - for tag in tags: - if (tag['Key'] == 'zuul_provider_name' and - tag['Value'] == provider.canonical_name): - self.ec2_client.delete_launch_template( - LaunchTemplateName=template_name) - self.log.debug("Deleted unused launch template: %s", - template_name) + tags = tag_list_to_dict(template.get('Tags', [])) + if (tags.get('zuul_system_id') == self.system_id and + tags.get('zuul_provider_name') == provider.canonical_name): + self.ec2_client.delete_launch_template( + LaunchTemplateName=template_name) + self.log.debug("Deleted unused launch template: %s", + template_name) def _getLaunchTemplateName(self, args): hasher = hashlib.sha256() @@ -1793,7 +1804,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): ids = [] for (del_id, log) in records: ids.append(del_id) - log.debug(f"Deleting instance {del_id}") + log.debug("Deleting instance %s", del_id) count = len(ids) with self.rate_limiter(log.debug, f"Deleted {count} instances"): self.ec2_client.terminate_instances(InstanceIds=ids) @@ -1805,7 +1816,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): ids = [] for (del_id, log) in records: ids.append(del_id) - log.debug(f"Releasing host {del_id}") + log.debug("Releasing host %s", del_id) count = len(ids) with self.rate_limiter(log.debug, f"Released {count} hosts"): self.ec2_client.release_hosts(HostIds=ids) @@ -1817,7 +1828,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if host['HostId'] == external_id: break else: - log.warning(f"Host not found when releasing {external_id}") + log.warning("Host not found when releasing %s", external_id) return None if immediate: with self.rate_limiter(log.debug, "Released host"): @@ -1835,7 +1846,7 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if instance['InstanceId'] == external_id: break else: - log.warning(f"Instance not found when deleting {external_id}") + log.warning("Instance not found when deleting %s", external_id) return None if immediate: with self.rate_limiter(log.debug, "Deleted instance"): @@ -1851,11 +1862,17 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if volume['VolumeId'] == external_id: break else: - self.log.warning(f"Volume not found when deleting {external_id}") + self.log.warning("Volume not found when deleting %s", external_id) return None with self.rate_limiter(self.log.debug, "Deleted volume"): self.log.debug(f"Deleting volume {external_id}") - self.ec2_client.delete_volume(VolumeId=volume['VolumeId']) + try: + self.ec2_client.delete_volume(VolumeId=volume['VolumeId']) + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'NotFound': + self.log.warning( + "Volume not found when deleting %s", external_id) + return None return volume def _deleteAmi(self, external_id): @@ -1863,11 +1880,17 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if ami['ImageId'] == external_id: break else: - self.log.warning(f"AMI not found when deleting {external_id}") + self.log.warning("AMI not found when deleting %s", external_id) return None with self.rate_limiter: self.log.debug(f"Deleting AMI {external_id}") - self.ec2_client.deregister_image(ImageId=ami['ImageId']) + try: + self.ec2_client.deregister_image(ImageId=ami['ImageId']) + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'NotFound': + self.log.warning( + "AMI not found when deleting %s", external_id) + return None return ami def _deleteSnapshot(self, external_id): @@ -1875,14 +1898,21 @@ class AwsProviderEndpoint(BaseProviderEndpoint): if snap['SnapshotId'] == external_id: break else: - self.log.warning(f"Snapshot not found when deleting {external_id}") + self.log.warning("Snapshot not found when deleting %s", + external_id) return None with self.rate_limiter: self.log.debug(f"Deleting Snapshot {external_id}") - self.ec2_client.delete_snapshot(SnapshotId=snap['SnapshotId']) + try: + self.ec2_client.delete_snapshot(SnapshotId=snap['SnapshotId']) + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'NotFound': + self.log.warning( + "Snapshot not found when deleting %s", external_id) + return None return snap def _deleteObject(self, bucket_name, external_id): with self.rate_limiter: - self.log.debug(f"Deleting object {external_id}") + self.log.debug("Deleting object %s", external_id) self.s3.Object(bucket_name, external_id).delete() diff --git a/zuul/driver/aws/awsmodel.py b/zuul/driver/aws/awsmodel.py index ef3fc5efea..07832bbf9d 100644 --- a/zuul/driver/aws/awsmodel.py +++ b/zuul/driver/aws/awsmodel.py @@ -76,6 +76,7 @@ class AwsResource(statemachine.Resource): TYPE_VOLUME = 'volume' TYPE_OBJECT = 'object' - def __init__(self, metadata, type, id): + def __init__(self, metadata, type, id, bucket_name=None): super().__init__(metadata, type) self.id = id + self.bucket_name = bucket_name diff --git a/zuul/driver/aws/awsprovider.py b/zuul/driver/aws/awsprovider.py index 939ab5d557..327b85349d 100644 --- a/zuul/driver/aws/awsprovider.py +++ b/zuul/driver/aws/awsprovider.py @@ -236,19 +236,6 @@ class AwsProvider(BaseProvider, subclass_id='aws'): log = logging.getLogger("zuul.AwsProvider") schema = AwsProviderSchema().getProviderSchema() - def __init__(self, *args, **kw): - super().__init__(*args, **kw) - # In listResources, we reconcile AMIs which appear to be - # imports but have no nodepool tags, however it's possible - # that these aren't nodepool images. If we determine that's - # the case, we'll add their ids here so we don't waste our - # time on that again. We do not need to serialize these; - # these are ephemeral caches. - self._set( - not_our_images=set(), - not_our_snapshots=set(), - ) - @property def endpoint(self): ep = getattr(self, '_endpoint', None) @@ -305,19 +292,6 @@ class AwsProvider(BaseProvider, subclass_id='aws'): def listInstances(self): return self.endpoint.listInstances() - def listResources(self): - bucket_name = self.object_storage.get('bucket-name') - - self.endpoint._tagSnapshots( - self.tenant_scoped_name, self.not_our_snapshots) - self.endpoint._tagAmis( - self.tenant_scoped_name, self.not_our_images) - return self.endpoint.listResources(bucket_name) - - def deleteResource(self, resource): - bucket_name = self.object_storage.get('bucket-name') - self.endpoint.deleteResource(resource, bucket_name) - def getQuotaLimits(self): # Get the instance and volume types that this provider handles instance_types = {} diff --git a/zuul/driver/openstack/__init__.py b/zuul/driver/openstack/__init__.py index b6be116982..8849725dc7 100644 --- a/zuul/driver/openstack/__init__.py +++ b/zuul/driver/openstack/__init__.py @@ -33,9 +33,10 @@ class OpenstackDriver(Driver, EndpointCacheMixin, return openstackconnection.OpenstackConnection(self, name, config) def getProvider(self, connection, tenant_name, canonical_name, - provider_config): + provider_config, system_id): return openstackprovider.OpenstackProvider( - self, connection, tenant_name, canonical_name, provider_config) + self, connection, tenant_name, canonical_name, provider_config, + system_id) def getProviderClass(self): return openstackprovider.OpenstackProvider @@ -46,17 +47,19 @@ class OpenstackDriver(Driver, EndpointCacheMixin, def getProviderNodeClass(self): return openstackmodel.OpenstackProviderNode - def _getEndpoint(self, connection, region): + def _getEndpoint(self, connection, region, system_id): region_str = region or '' endpoint_id = '/'.join([ urllib.parse.quote_plus(connection.connection_name), urllib.parse.quote_plus(region_str), ]) - return self.getEndpointById(endpoint_id, - create_args=(self, connection, region)) + return self.getEndpointById( + endpoint_id, + create_args=(self, connection, region, system_id)) def getEndpoint(self, provider): - return self._getEndpoint(provider.connection, provider.region) + return self._getEndpoint(provider.connection, provider.region, + provider.system_id) def stop(self): self.stopEndpoints() diff --git a/zuul/driver/openstack/openstackendpoint.py b/zuul/driver/openstack/openstackendpoint.py index 43ea81aaf1..4c749a2f18 100644 --- a/zuul/driver/openstack/openstackendpoint.py +++ b/zuul/driver/openstack/openstackendpoint.py @@ -357,9 +357,9 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): IMAGE_UPLOAD_SLEEP = 30 - def __init__(self, driver, connection, region): + def __init__(self, driver, connection, region, system_id): name = f'{connection.connection_name}-{region}' - super().__init__(driver, connection, name) + super().__init__(driver, connection, name, system_id) self.log = logging.getLogger(f"zuul.openstack.{self.name}") self.region = region @@ -411,7 +411,7 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): self.api_executor.shutdown() self._running = False - def listResources(self): + def listResources(self, providers): for server in self._listServers(): if server['status'].lower() == 'deleted': continue @@ -422,9 +422,12 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): # automatic resource cleanup in cleanupLeakedResources because # openstack doesn't store metadata on those objects, so we # call internal cleanup methods here. - if self.provider.port_cleanup_interval: - self._cleanupLeakedPorts() - if self.provider.clean_floating_ips: + intervals = [p.port_cleanup_interval for p in providers + if p.port_cleanup_interval] + interval = min(intervals or [0]) + if interval: + self._cleanupLeakedPorts(interval) + if any([p.floating_ip_cleanup for p in providers]): self._cleanupFloatingIps() def deleteResource(self, resource): @@ -920,7 +923,7 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): ret.append(p) return ret - def _cleanupLeakedPorts(self): + def _cleanupLeakedPorts(self, interval): if not self._last_port_cleanup: self._last_port_cleanup = time.monotonic() ports = self._listPorts(status='DOWN') @@ -930,7 +933,7 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): # Return if not enough time has passed between cleanup last_check_in_secs = int(time.monotonic() - self._last_port_cleanup) - if last_check_in_secs <= self.provider.port_cleanup_interval: + if last_check_in_secs <= interval: return ports = self._listPorts(status='DOWN') @@ -944,15 +947,15 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): self._deletePort(port_id) except Exception: self.log.exception("Exception deleting port %s in %s:", - port_id, self.provider.name) + port_id, self.name) else: removed_count += 1 self.log.debug("Removed DOWN port %s in %s", - port_id, self.provider.name) + port_id, self.name) - if self._statsd and removed_count: - key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name) - self._statsd.incr(key, removed_count) + # if self._statsd and removed_count: + # key = 'nodepool.provider.%s.leaked.ports' % (self.name) + # self._statsd.incr(key, removed_count) self._last_port_cleanup = time.monotonic() @@ -973,10 +976,10 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint): # indicate something happened. if type(did_clean) is bool: did_clean = 1 - if self._statsd: - key = ('nodepool.provider.%s.leaked.floatingips' - % self.provider.name) - self._statsd.incr(key, did_clean) + # if self._statsd: + # key = ('nodepool.provider.%s.leaked.floatingips' + # % self.name) + # self._statsd.incr(key, did_clean) def getConsoleLog(self, label, external_id): if not label.console_log: diff --git a/zuul/driver/openstack/openstackprovider.py b/zuul/driver/openstack/openstackprovider.py index bd9173b6aa..4f2ca0e0da 100644 --- a/zuul/driver/openstack/openstackprovider.py +++ b/zuul/driver/openstack/openstackprovider.py @@ -167,6 +167,8 @@ class OpenstackProviderSchema(BaseProviderSchema): openstack_provider_schema = vs.Schema({ Optional('region'): Nullable(str), Optional('resource-limits', default=dict()): resource_limits, + Optional('floating-ip-cleanup', default=False): bool, + Optional('port-cleanup-interval', default=600): int, }) return assemble( @@ -182,19 +184,6 @@ class OpenstackProvider(BaseProvider, subclass_id='openstack'): log = logging.getLogger("zuul.OpenstackProvider") schema = OpenstackProviderSchema().getProviderSchema() - def __init__(self, *args, **kw): - super().__init__(*args, **kw) - # In listResources, we reconcile AMIs which appear to be - # imports but have no nodepool tags, however it's possible - # that these aren't nodepool images. If we determine that's - # the case, we'll add their ids here so we don't waste our - # time on that again. We do not need to serialize these; - # these are ephemeral caches. - self._set( - not_our_images=set(), - not_our_snapshots=set(), - ) - @property def endpoint(self): ep = getattr(self, '_endpoint', None) @@ -207,7 +196,7 @@ class OpenstackProvider(BaseProvider, subclass_id='openstack'): # We are not fully constructed yet at this point, so we need # to peek to get the region and endpoint. region = provider_config.get('region') - endpoint = connection.driver._getEndpoint(connection, region) + endpoint = connection.driver._getEndpoint(connection, region, None) return OpenstackProviderImage( image_config, provider_config, image_format=endpoint.getImageFormat()) @@ -245,14 +234,6 @@ class OpenstackProvider(BaseProvider, subclass_id='openstack'): def listInstances(self): return self.endpoint.listInstances() - def listResources(self): - # TODO: implement - return [] - - def deleteResource(self, resource): - # TODO: implement - pass - def getQuotaLimits(self): cloud = self.endpoint.getQuotaLimits() zuul = QuotaInformation(default=math.inf, **self.resource_limits) diff --git a/zuul/launcher/server.py b/zuul/launcher/server.py index 190d7486e9..ef1764a93c 100644 --- a/zuul/launcher/server.py +++ b/zuul/launcher/server.py @@ -81,6 +81,10 @@ def scores_for_label(label_cname, candidate_names): } +def endpoint_score(endpoint): + return mmh3.hash(f"{endpoint.canonical_name}", signed=False) + + class NodesetRequestError(Exception): """Errors that should lead to the request being declined.""" pass @@ -238,8 +242,10 @@ class EndpointUploadJob: raise Exception( f"Unable to find image {self.upload.canonical_name}") - # TODO: add upload id, etc - metadata = {} + metadata = { + 'zuul_system_id': self.launcher.system.system_id, + 'zuul_upload_uuid': self.upload.uuid, + } image_name = f'{provider_image.name}-{self.artifact.uuid}' external_id = provider.uploadImage( provider_image, image_name, self.path, self.artifact.format, @@ -667,6 +673,133 @@ class NodescanWorker: last_unready_check = time.monotonic() +class CleanupWorker: + # Delay 60 seconds between iterations + INTERVAL = 60 + log = logging.getLogger("zuul.Launcher") + + def __init__(self, launcher): + self.launcher = launcher + self.wake_event = threading.Event() + self.possibly_leaked_nodes = {} + self.possibly_leaked_uploads = {} + self._running = False + self.thread = None + + def start(self): + self.log.debug("Starting cleanup worker thread") + self._running = True + self.thread = threading.Thread(target=self.run, + name="CleanupWorker") + self.thread.start() + + def stop(self): + self.log.debug("Stopping cleanup worker") + self._running = False + self.wake_event.set() + + def join(self): + self.log.debug("Joining cleanup thread") + if self.thread: + self.thread.join() + self.log.debug("Joined cleanup thread") + + def run(self): + while self._running: + # Wait before performing the first cleanup + self.wake_event.wait(self.INTERVAL) + self.wake_event.clear() + try: + self._run() + except Exception: + self.log.exception("Error in cleanup worker:") + + def getMatchingEndpoints(self): + all_launchers = { + c.hostname: c for c in COMPONENT_REGISTRY.registry.all("launcher")} + + for endpoint in self.launcher.endpoints.values(): + candidate_launchers = { + n: c for n, c in all_launchers.items() + if not c.connection_filter + or endpoint.connection.connection_name in c.connection_filter} + candidate_names = set(candidate_launchers) + launcher_scores = {endpoint_score(endpoint): n + for n in candidate_names} + sorted_scores = sorted(launcher_scores.items()) + for score, launcher_name in sorted_scores: + launcher = candidate_launchers.get(launcher_name) + if not launcher: + # Launcher is no longer online + continue + if launcher.state != launcher.RUNNING: + continue + if launcher.hostname == self.launcher.component_info.hostname: + yield endpoint + break + + def _run(self): + for endpoint in self.getMatchingEndpoints(): + try: + self.cleanupLeakedResources(endpoint) + except Exception: + self.log.exception("Error in cleanup worker:") + + def cleanupLeakedResources(self, endpoint): + newly_leaked_nodes = {} + newly_leaked_uploads = {} + + # Get a list of all providers that share this endpoint. This + # is because some providers may store resources like image + # uploads in multiple per-provider locations. + providers = [ + p for p in self.launcher._getProviders() + if p.getEndpoint().canonical_name == endpoint.canonical_name + ] + + for resource in endpoint.listResources(providers): + if (resource.metadata.get('zuul_system_id') != + self.launcher.system.system_id): + continue + node_id = resource.metadata.get('zuul_node_uuid') + upload_id = resource.metadata.get('zuul_upload_uuid') + if node_id and self.launcher.api.getProviderNode(node_id) is None: + newly_leaked_nodes[node_id] = resource + if node_id in self.possibly_leaked_nodes: + # We've seen this twice now, so it's not a race + # condition. + try: + endpoint.deleteResource(resource) + # if self._statsd: + # key = ('nodepool.provider.%s.leaked.%s' + # % (self.provider.name, + # resource.plural_metric_name)) + # self._statsd.incr(key, 1) + except Exception: + self.log.exception("Unable to delete leaked " + f"resource for node {node_id}") + if (upload_id and + self.launcher.image_upload_registry.getItem( + upload_id) is None): + newly_leaked_uploads[upload_id] = resource + if upload_id in self.possibly_leaked_uploads: + # We've seen this twice now, so it's not a race + # condition. + try: + endpoint.deleteResource(resource) + # if self._statsd: + # key = ('nodepool.provider.%s.leaked.%s' + # % (self.provider.name, + # resource.plural_metric_name)) + # self._statsd.incr(key, 1) + except Exception: + self.log.exception( + "Unable to delete leaked " + f"resource for upload {upload_id}") + self.possibly_leaked_nodes = newly_leaked_nodes + self.possibly_leaked_uploads = newly_leaked_uploads + + class Launcher: log = logging.getLogger("zuul.Launcher") # Max. time to wait for a cache to sync @@ -678,6 +811,11 @@ class Launcher: def __init__(self, config, connections): self._running = True + # The cleanup method requires some extra AWS mocks that are + # not enabled in most tests, so we allow the test suite to + # disable it. + # TODO: switch basic launcher tests to openstack and remove. + self._start_cleanup = True self.config = config self.connections = connections self.repl = None @@ -740,7 +878,7 @@ class Launcher: self.tenant_layout_state = LayoutStateStore( self.zk_client, self._layoutUpdatedCallback) self.layout_providers_store = LayoutProvidersStore( - self.zk_client, self.connections) + self.zk_client, self.connections, self.system.system_id) self.local_layout_state = {} self.image_build_registry = ImageBuildRegistry( @@ -768,6 +906,7 @@ class Launcher: max_workers=10, thread_name_prefix="UploadWorker", ) + self.cleanup_worker = CleanupWorker(self) def _layoutUpdatedCallback(self): self.layout_updated_event.set() @@ -1477,6 +1616,11 @@ class Launcher: raise ProviderNodeError( f"Unable to find {provider_name} in tenant {tenant_name}") + def _getProviders(self): + for providers in self.tenant_providers.values(): + for provider in providers: + yield provider + def _hasProvider(self, node): try: self._getProviderForNode(node) @@ -1506,6 +1650,10 @@ class Launcher: self.log.debug("Starting launcher thread") self.launcher_thread.start() + if self._start_cleanup: + self.log.debug("Starting cleanup thread") + self.cleanup_worker.start() + def stop(self): self.log.debug("Stopping launcher") self._running = False @@ -1514,10 +1662,12 @@ class Launcher: self.stopRepl() self._command_running = False self.command_socket.stop() - self.connections.stop() + self.cleanup_worker.stop() + self.cleanup_worker.join() self.upload_executor.shutdown() self.endpoint_upload_executor.shutdown() self.nodescan_worker.stop() + self.connections.stop() # Endpoints are stopped by drivers self.log.debug("Stopped launcher") diff --git a/zuul/provider/__init__.py b/zuul/provider/__init__.py index 9b59588e9d..418f633725 100644 --- a/zuul/provider/__init__.py +++ b/zuul/provider/__init__.py @@ -118,10 +118,11 @@ class BaseProviderEndpoint(metaclass=abc.ABCMeta): the unit of visibility of instances, VPCs, images, etc. """ - def __init__(self, driver, connection, name): + def __init__(self, driver, connection, name, system_id): self.driver = driver self.connection = connection self.name = name + self.system_id = system_id self.start_lock = threading.Lock() self.started = False self.stopped = False @@ -210,7 +211,8 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin, def __init__(self, *args): super().__init__() if args: - (driver, connection, tenant_name, canonical_name, config) = args + (driver, connection, tenant_name, canonical_name, config, + system_id) = args config = config.copy() config.pop('_source_context') config.pop('_start_mark') @@ -223,6 +225,7 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin, tenant_name=tenant_name, canonical_name=canonical_name, config=config, + system_id=system_id, **parsed_config, ) @@ -231,7 +234,7 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin, f"canonical_name={self.canonical_name}>") @classmethod - def fromZK(cls, context, path, connections): + def fromZK(cls, context, path, connections, system_id): """Deserialize a Provider (subclass) from ZK. To deserialize a Provider from ZK, pass the connection @@ -246,8 +249,11 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin, extra = {'connections': connections} obj = cls._fromRaw(raw_data, zstat, extra) connection = connections.connections[obj.connection_name] - obj._set(connection=connection, - driver=connection.driver) + obj._set( + connection=connection, + driver=connection.driver, + system_id=system_id, + ) return obj def getProviderSchema(self): @@ -582,6 +588,10 @@ class EndpointCacheMixin: self.endpoints[endpoint_id] = endpoint return endpoint + def getEndpoints(self): + with self.endpoints_lock: + return list(self.endpoints.values()) + def stopEndpoints(self): with self.endpoints_lock: for endpoint in self.endpoints.values(): diff --git a/zuul/provider/statemachine.py b/zuul/provider/statemachine.py index 877dac41aa..7ab1e59257 100644 --- a/zuul/provider/statemachine.py +++ b/zuul/provider/statemachine.py @@ -145,3 +145,6 @@ class Resource: self.type = type self.plural_metric_name = type + 's' self.metadata = metadata + + def __repr__(self): + return f'<{self.__class__.__name__} {self.type} {self.metadata}>' diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 40ff9829a3..498e54ae8d 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -345,7 +345,7 @@ class Scheduler(threading.Thread): self.zk_client, self.layout_update_event.set) self.layout_providers_store = LayoutProvidersStore( - self.zk_client, self.connections) + self.zk_client, self.connections, self.system.system_id) self.local_layout_state = {} command_socket = get_default( diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index bb07b55a76..ebeccdd4bc 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -1121,7 +1121,6 @@ class ZuulWebAPI(object): def __init__(self, zuulweb): self.zuulweb = zuulweb self.zk_client = zuulweb.zk_client - self.system = ZuulSystem(self.zk_client) self.zk_nodepool = ZooKeeperNodepool(self.zk_client, enable_node_cache=True) self.status_caches = {} @@ -2170,7 +2169,7 @@ class ZuulWebAPI(object): if not (node.user_data and isinstance(node.user_data, dict) and node.user_data.get('zuul_system') == - self.system.system_id and + self.zuulweb.system.system_id and node.tenant_name == tenant_name): continue node_data = {} @@ -2960,6 +2959,7 @@ class ZuulWeb(object): self.monitoring_server.start() self.component_registry = COMPONENT_REGISTRY.create(self.zk_client) + self.system = ZuulSystem(self.zk_client) self.system_config_thread = None self.system_config_cache_wake_event = threading.Event() @@ -2986,7 +2986,7 @@ class ZuulWeb(object): self.tenant_providers = {} self.layout_providers_store = LayoutProvidersStore( - self.zk_client, self.connections) + self.zk_client, self.connections, self.system.system_id) self.image_build_registry = ImageBuildRegistry(self.zk_client) self.image_upload_registry = ImageUploadRegistry(self.zk_client) self.nodes_cache = LockableZKObjectCache( diff --git a/zuul/zk/layout.py b/zuul/zk/layout.py index 635f991b99..3220685f37 100644 --- a/zuul/zk/layout.py +++ b/zuul/zk/layout.py @@ -225,9 +225,10 @@ class LayoutProvidersStore(ZooKeeperSimpleBase): tenant_root = "/zuul/tenant" - def __init__(self, client, connections): + def __init__(self, client, connections, system_id): super().__init__(client) self.connections = connections + self.system_id = system_id def get(self, context, tenant_name): path = f"{self.tenant_root}/{tenant_name}/provider" @@ -244,7 +245,7 @@ class LayoutProvidersStore(ZooKeeperSimpleBase): for provider_name in provider_names: provider_path = (f"{path}/{provider_name}/config") yield BaseProvider.fromZK( - context, provider_path, self.connections + context, provider_path, self.connections, self.system_id ) def set(self, context, tenant_name, providers):