From da21971e9b3d2714ded0f51667c92b572896c641 Mon Sep 17 00:00:00 2001 From: Clint Byrum Date: Tue, 2 Apr 2019 16:00:50 -0700 Subject: [PATCH] Implement max-servers for AWS driver Max-servers has been ignored up until now. This implements the basic checks so that AWS users can limit the number of instances they launch. Change-Id: I73296bde1cdde80c52b6b5b725f268a17562060d --- nodepool/driver/aws/handler.py | 47 +++++++++++++++++++++++- nodepool/driver/aws/provider.py | 33 ++++++++++++++--- nodepool/tests/fixtures/aws.yaml | 2 +- nodepool/tests/unit/test_driver_aws.py | 51 ++++++++++++++++++++------ 4 files changed, 114 insertions(+), 19 deletions(-) diff --git a/nodepool/driver/aws/handler.py b/nodepool/driver/aws/handler.py index 47da54d56..cf1f32a0b 100644 --- a/nodepool/driver/aws/handler.py +++ b/nodepool/driver/aws/handler.py @@ -13,11 +13,12 @@ # under the License. import logging +import math import time from nodepool import exceptions from nodepool import zk -from nodepool.driver.utils import NodeLauncher +from nodepool.driver.utils import NodeLauncher, QuotaInformation from nodepool.driver import NodeRequestHandler from nodepool.nodeutils import nodescan @@ -25,6 +26,7 @@ from nodepool.nodeutils import nodescan class AwsInstanceLauncher(NodeLauncher): def __init__(self, handler, node, provider_config, provider_label): super().__init__(handler.zk, node, provider_config) + self.provider_name = provider_config.name self.retries = provider_config.launch_retries self.pool = provider_config.pools[provider_label.pool.name] self.handler = handler @@ -60,6 +62,10 @@ class AwsInstanceLauncher(NodeLauncher): if state == 'running': instance.create_tags(Tags=[{'Key': 'nodepool_id', 'Value': str(self.node.id)}]) + instance.create_tags(Tags=[{'Key': 'nodepool_pool', + 'Value': str(self.pool.name)}]) + instance.create_tags(Tags=[{'Key': 'nodepool_provider', + 'Value': str(self.provider_name)}]) break time.sleep(0.5) instance.reload() @@ -127,6 +133,45 @@ class AwsNodeRequestHandler(NodeRequestHandler): return False return True + def hasRemainingQuota(self, ntype): + ''' + Apply max_servers check, ignoring other quotas. + + :returns: True if we have room, False otherwise. + ''' + needed_quota = QuotaInformation(cores=1, instances=1, ram=1, default=1) + n_running = self.manager.countNodes(self.pool.name) + pool_quota = QuotaInformation( + cores=math.inf, + instances=self.pool.max_servers - n_running, + ram=math.inf, + default=math.inf) + pool_quota.subtract(needed_quota) + self.log.debug("hasRemainingQuota({},{}) = {}".format( + self.pool, ntype, pool_quota)) + return pool_quota.non_negative() + + def hasProviderQuota(self, node_types): + ''' + Apply max_servers check to a whole request + + :returns: True if we have room, False otherwise. + ''' + needed_quota = QuotaInformation( + cores=1, + instances=len(node_types), + ram=1, + default=1) + pool_quota = QuotaInformation( + cores=math.inf, + instances=self.pool.max_servers, + ram=math.inf, + default=math.inf) + pool_quota.subtract(needed_quota) + self.log.debug("hasProviderQuota({},{}) = {}".format( + self.pool, node_types, pool_quota)) + return pool_quota.non_negative() + def launchesComplete(self): ''' Check if all launch requests have completed. diff --git a/nodepool/driver/aws/provider.py b/nodepool/driver/aws/provider.py index e25202628..112700464 100644 --- a/nodepool/driver/aws/provider.py +++ b/nodepool/driver/aws/provider.py @@ -27,11 +27,14 @@ class AwsInstance: if metadatas: for metadata in metadatas: if metadata["Key"] == "nodepool_id": - self.metadata = { - 'nodepool_provider_name': provider.name, - 'nodepool_node_id': metadata["Value"], - } - break + self.metadata['nodepool_node_id'] = metadata["Value"] + continue + if metadata["Key"] == "nodepool_pool": + self.metadata['nodepool_pool_name'] = metadata["Value"] + continue + if metadata["Key"] == "nodepool_provider": + self.metadata['nodepool_provider_name'] = metadata["Value"] + continue def get(self, name, default=None): return getattr(self, name, default) @@ -65,10 +68,30 @@ class AwsProvider(Provider): for instance in self.ec2.instances.all(): if instance.state["Name"].lower() == "terminated": continue + ours = False + if instance.tags: + for tag in instance.tags: + if (tag["Key"] == 'nodepool_provider' + and tag["Value"] == self.provider.name): + ours = True + break + if not ours: + continue servers.append(AwsInstance( instance.id, instance.tags, self.provider)) return servers + def countNodes(self, pool=None): + n = 0 + for instance in self.listNodes(): + if pool is not None: + if 'nodepool_pool_name' not in instance.metadata: + continue + if pool != instance.metadata['nodepool_pool_name']: + continue + n += 1 + return n + def getImage(self, image_id): return self.ec2.Image(image_id) diff --git a/nodepool/tests/fixtures/aws.yaml b/nodepool/tests/fixtures/aws.yaml index 275056c31..49631538a 100644 --- a/nodepool/tests/fixtures/aws.yaml +++ b/nodepool/tests/fixtures/aws.yaml @@ -16,7 +16,7 @@ providers: username: ubuntu pools: - name: main - max-servers: 5 + max-servers: 1 subnet-id: null security-group-id: null labels: diff --git a/nodepool/tests/unit/test_driver_aws.py b/nodepool/tests/unit/test_driver_aws.py index 835fb0a0e..f4fce4e3e 100644 --- a/nodepool/tests/unit/test_driver_aws.py +++ b/nodepool/tests/unit/test_driver_aws.py @@ -17,6 +17,7 @@ import fixtures import logging import os import tempfile +import time from unittest.mock import patch import boto3 @@ -78,18 +79,44 @@ class TestDriverAws(tests.DBTestCase): self.log.debug("Waiting for request %s", req.id) req = self.waitForNodeRequest(req) - self.assertEqual(req.state, zk.FULFILLED) - self.assertNotEqual(req.nodes, []) - node = self.zk.getNode(req.nodes[0]) - self.assertEqual(node.allocated_to, req.id) - self.assertEqual(node.state, zk.READY) - self.assertIsNotNone(node.launcher) - self.assertEqual(node.connection_type, 'ssh') - nodescan.assert_called_with( - node.interface_ip, port=22, timeout=180, gather_hostkeys=True) + self.assertEqual(req.state, zk.FULFILLED) - node.state = zk.DELETING - self.zk.storeNode(node) + self.assertNotEqual(req.nodes, []) + node = self.zk.getNode(req.nodes[0]) + self.assertEqual(node.allocated_to, req.id) + self.assertEqual(node.state, zk.READY) + self.assertIsNotNone(node.launcher) + self.assertEqual(node.connection_type, 'ssh') + nodescan.assert_called_with( + node.interface_ip, + port=22, + timeout=180, + gather_hostkeys=True) + # A new request will be paused and for lack of quota until this + # one is deleted + req2 = zk.NodeRequest() + req2.state = zk.REQUESTED + req2.node_types.append('ubuntu1404') + self.zk.storeNodeRequest(req2) + req2 = self.waitForNodeRequest( + req2, (zk.PENDING, zk.FAILED, zk.FULFILLED)) + self.assertEqual(req2.state, zk.PENDING) + # It could flip from PENDING to one of the others, so sleep a + # bit and be sure + time.sleep(1) + req2 = self.waitForNodeRequest( + req2, (zk.PENDING, zk.FAILED, zk.FULFILLED)) + self.assertEqual(req2.state, zk.PENDING) - self.waitForNodeDeletion(node) + node.state = zk.DELETING + self.zk.storeNode(node) + + self.waitForNodeDeletion(node) + + req2 = self.waitForNodeRequest(req2, (zk.FAILED, zk.FULFILLED)) + self.assertEqual(req2.state, zk.FULFILLED) + node = self.zk.getNode(req2.nodes[0]) + node.state = zk.DELETING + self.zk.storeNode(node) + self.waitForNodeDeletion(node)