Finish adding GCE quota support

This expands the simple driver with the calls necessary to get
the current quota limits and expected and current usage of flavors.
That's enough to handle real cloud quota.

This change also removes a leftover debug line.

Change-Id: I837f5ad8e697b48ad8138021e64e799c8be3c6fc
This commit is contained in:
James E. Blair 2020-08-04 16:15:07 -07:00
parent 6ebac5fd17
commit 120f399ec6
4 changed files with 278 additions and 27 deletions

View File

@ -13,9 +13,11 @@
# under the License.
import logging
import math
from nodepool.driver.simple import SimpleTaskManagerAdapter
from nodepool.driver.simple import SimpleTaskManagerInstance
from nodepool.driver.utils import QuotaInformation
import googleapiclient.discovery
@ -36,11 +38,18 @@ class GCEInstance(SimpleTaskManagerInstance):
if len(access):
self.public_ipv4 = access[0].get('natIP')
self.interface_ip = self.public_ipv4 or self.private_ipv4
self._machine_type = data.get('_nodepool_gce_machine_type')
if data.get('metadata'):
for item in data['metadata'].get('items', []):
self.metadata[item['key']] = item['value']
def getQuotaInformation(self):
return QuotaInformation(
cores=self._machine_type['guestCpus'],
instances=1,
ram=self._machine_type['memoryMb'])
class GCEAdapter(SimpleTaskManagerAdapter):
log = logging.getLogger("nodepool.driver.gce.GCEAdapter")
@ -48,6 +57,7 @@ class GCEAdapter(SimpleTaskManagerAdapter):
def __init__(self, provider):
self.provider = provider
self.compute = googleapiclient.discovery.build('compute', 'v1')
self._machine_types = {}
def listInstances(self, task_manager):
servers = []
@ -58,6 +68,9 @@ class GCEAdapter(SimpleTaskManagerAdapter):
result = q.execute()
for instance in result.get('items', []):
instance_type = instance['machineType'].split('/')[-1]
mtype = self._getMachineType(task_manager, instance_type)
instance['_nodepool_gce_machine_type'] = mtype
servers.append(GCEInstance(instance))
return servers
@ -84,6 +97,18 @@ class GCEAdapter(SimpleTaskManagerAdapter):
return image_id
def _getMachineType(self, task_manager, machine_type):
if machine_type in self._machine_types:
return self._machine_types[machine_type]
q = self.compute.machineTypes().get(
project=self.provider.project,
zone=self.provider.zone,
machineType=machine_type)
with task_manager.rateLimit():
result = q.execute()
self._machine_types[machine_type] = result
return result
def createInstance(self, task_manager, hostname, metadata, label):
image_id = self._getImageId(task_manager, label.cloud_image)
disk_init = dict(sourceImage=image_id,
@ -93,8 +118,8 @@ class GCEAdapter(SimpleTaskManagerAdapter):
disk = dict(boot=True,
autoDelete=True,
initializeParams=disk_init)
machine_type = 'zones/{}/machineTypes/{}'.format(
self.provider.zone, label.instance_type)
mtype = self._getMachineType(task_manager, label.instance_type)
machine_type = mtype['selfLink']
network = dict(network='global/networks/default',
accessConfigs=[dict(
type='ONE_TO_ONE_NAT',
@ -117,3 +142,30 @@ class GCEAdapter(SimpleTaskManagerAdapter):
with task_manager.rateLimit():
q.execute()
return hostname
def getQuotaLimits(self, task_manager):
q = self.compute.regions().get(project=self.provider.project,
region=self.provider.region)
with task_manager.rateLimit():
ret = q.execute()
cores = None
instances = None
for item in ret['quotas']:
if item['metric'] == 'CPUS':
cores = item['limit']
continue
if item['metric'] == 'INSTANCES':
instances = item['limit']
continue
return QuotaInformation(
cores=cores,
instances=instances,
default=math.inf)
def getQuotaForLabel(self, task_manager, label):
mtype = self._getMachineType(task_manager, label.instance_type)
return QuotaInformation(
cores=mtype['guestCpus'],
instances=1,
ram=mtype['memoryMb'])

View File

@ -50,6 +50,21 @@ class ListInstancesTask(Task):
return self.args['adapter'].listInstances(manager)
class GetQuotaLimitsTask(Task):
name = 'get_quota_limits'
def main(self, manager):
return self.args['adapter'].getQuotaLimits(manager)
class GetQuotaForLabelTask(Task):
name = 'get_quota_for_label'
def main(self, manager):
return self.args['adapter'].getQuotaForLabel(
manager, self.args['label_config'])
class SimpleTaskManagerLauncher(NodeLauncher):
"""The NodeLauncher implementation for the SimpleTaskManager driver
framework"""
@ -175,21 +190,28 @@ class SimpleTaskManagerHandler(NodeRequestHandler):
:param node_types: list of node types to check
:return: True if the node list fits into the provider, False otherwise
'''
# TODO: Add support for real quota handling; this only handles
# max_servers.
needed_quota = QuotaInformation(
cores=1,
instances=len(node_types),
ram=1,
default=1)
needed_quota = QuotaInformation()
for ntype in node_types:
needed_quota.add(
self.manager.quotaNeededByLabel(ntype, self.pool))
if hasattr(self.pool, 'ignore_provider_quota'):
if not self.pool.ignore_provider_quota:
cloud_quota = self.manager.estimatedNodepoolQuota()
cloud_quota.subtract(needed_quota)
if not cloud_quota.non_negative():
return False
# Now calculate pool specific quota. Values indicating no quota default
# to math.inf representing infinity that can be calculated with.
pool_quota = QuotaInformation(
cores=math.inf,
cores=getattr(self.pool, 'max_cores', None),
instances=self.pool.max_servers,
ram=math.inf,
ram=getattr(self.pool, 'max_ram', None),
default=math.inf)
pool_quota.subtract(needed_quota)
self.log.debug("hasProviderQuota({},{}) = {}".format(
self.pool, node_types, pool_quota))
return pool_quota.non_negative()
def hasRemainingQuota(self, ntype):
@ -216,9 +238,11 @@ class SimpleTaskManagerHandler(NodeRequestHandler):
# Now calculate pool specific quota. Values indicating no quota default
# to math.inf representing infinity that can be calculated with.
# TODO: add cores, ram
pool_quota = QuotaInformation(instances=self.pool.max_servers,
default=math.inf)
pool_quota = QuotaInformation(
cores=getattr(self.pool, 'max_cores', None),
instances=self.pool.max_servers,
ram=getattr(self.pool, 'max_ram', None),
default=math.inf)
pool_quota.subtract(
self.manager.estimatedNodepoolQuotaUsed(self.pool))
self.log.debug("Current pool quota: %s" % pool_quota)
@ -279,20 +303,56 @@ class SimpleTaskManagerProvider(BaseTaskManagerProvider, QuotaSupport):
return True
def getProviderLimits(self):
# TODO: query the api to get real limits
return QuotaInformation(
cores=math.inf,
instances=math.inf,
ram=math.inf,
default=math.inf)
try:
t = self.task_manager.submitTask(GetQuotaLimitsTask(
adapter=self.adapter))
return t.wait()
except NotImplementedError:
return QuotaInformation(
cores=math.inf,
instances=math.inf,
ram=math.inf,
default=math.inf)
def quotaNeededByLabel(self, ntype, pool):
# TODO: return real quota information about a label
return QuotaInformation(cores=1, instances=1, ram=1, default=1)
provider_label = pool.labels[ntype]
try:
t = self.task_manager.submitTask(GetQuotaForLabelTask(
adapter=self.adapter, label_config=provider_label))
return t.wait()
except NotImplementedError:
return QuotaInformation()
def unmanagedQuotaUsed(self):
# TODO: return real quota information about quota
return QuotaInformation()
'''
Sums up the quota used by servers unmanaged by nodepool.
:return: Calculated quota in use by unmanaged servers
'''
used_quota = QuotaInformation()
node_ids = set([n.id for n in self._zk.nodeIterator()])
for server in self.listNodes():
meta = server.metadata
nodepool_provider_name = meta.get('nodepool_provider_name')
if (nodepool_provider_name and
nodepool_provider_name == self.provider.name):
# This provider (regardless of the launcher) owns this
# node so it must not be accounted for unmanaged
# quota; unless it has leaked.
nodepool_node_id = meta.get('nodepool_node_id')
if nodepool_node_id and nodepool_node_id in node_ids:
# It has not leaked.
continue
try:
qi = server.getQuotaInformation()
except NotImplementedError:
qi = QuotaInformation()
used_quota.add(qi)
return used_quota
def cleanupNode(self, external_id):
instance = self.getInstance(external_id)
@ -434,6 +494,13 @@ class SimpleTaskManagerInstance:
"""
raise NotImplementedError()
def getQuotaInformation(self):
"""Return quota information about this instance.
:returns: A :py:class:`QuotaInformation` object.
"""
raise NotImplementedError()
class SimpleTaskManagerAdapter:
"""Public interface for the simple TaskManager Provider
@ -487,6 +554,36 @@ class SimpleTaskManagerAdapter:
"""
raise NotImplementedError()
def getQuotaLimits(self, task_manager):
"""Return the quota limits for this provider
The default implementation returns a simple QuotaInformation
with no limits. Override this to provide accurate
information.
:param TaskManager task_manager: An instance of
:py:class:`~nodepool.driver.taskmananger.TaskManager`.
:returns: A :py:class:`QuotaInformation` object.
"""
return QuotaInformation(default=math.inf)
def getQuotaForLabel(self, task_manager, label_config):
"""Return information about the quota used for a label
The default implementation returns a simple QuotaInformation
for one instance; override this to return more detailed
information including cores and RAM.
:param TaskManager task_manager: An instance of
:py:class:`~nodepool.driver.taskmananger.TaskManager`.
:param ProviderLabel label_config: A LabelConfig object describing
a label for an instance.
:returns: A :py:class:`QuotaInformation` object.
"""
return QuotaInformation(instances=1)
class SimpleTaskManagerDriver(Driver):
"""Subclass this to make a simple driver"""

View File

@ -211,7 +211,6 @@ class QuotaSupport:
:return: QuotaInformation about the label
'''
print("base")
pass
def invalidateQuotaCache(self):

View File

@ -100,10 +100,107 @@ class GCloudImages(GCloudCollection):
raise googleapiclient.errors.HttpError(404, b'')
class GCloudMachineTypes(GCloudCollection):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.items.append({
"id": "3002",
"creationTimestamp": "1969-12-31T16:00:00.000-08:00",
"name": "n1-standard-2",
"description": "2 vCPUs, 7.5 GB RAM",
"guestCpus": 2,
"memoryMb": 7680,
"imageSpaceGb": 10,
"maximumPersistentDisks": 128,
"maximumPersistentDisksSizeGb": "263168",
"zone": "us-central1-a",
"selfLink": "https://www.googleapis.com/compute/v1/projects/"
"gcloud-project/zones/us-central1-a/machineTypes/n1-standard-2",
"isSharedCpu": False,
"kind": "compute#machineType"
})
self.items.append({
"id": "1000",
"creationTimestamp": "1969-12-31T16:00:00.000-08:00",
"name": "f1-micro",
"description": "1 vCPU (shared physical core) and 0.6 GB RAM",
"guestCpus": 1,
"memoryMb": 614,
"imageSpaceGb": 0,
"maximumPersistentDisks": 16,
"maximumPersistentDisksSizeGb": "3072",
"zone": "us-central1-a",
"selfLink": "https://www.googleapis.com/compute/v1/projects/"
"gcloud-project/zones/us-central1-a/machineTypes/f1-micro",
"isSharedCpu": True,
"kind": "compute#machineType"
})
def get(self, *args, **kw):
return GCloudRequest(self._get, args, kw)
def _get(self, *args, **kw):
for item in self.items:
if (kw['machineType'] == item['name']):
return item
# Note this isn't quite right, but at least it's the correct class
raise googleapiclient.errors.HttpError(404, b'')
class GCloudRegions(GCloudCollection):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.items.append({
"id": "1000",
"creationTimestamp": "1969-12-31T16:00:00.000-08:00",
"name": "us-central1",
"description": "us-central1",
"status": "UP",
"zones": [
"https://www.googleapis.com/compute/v1/projects/gcloud-project"
"/zones/us-central1-a",
"https://www.googleapis.com/compute/v1/projects/gcloud-project"
"/zones/us-central1-b",
"https://www.googleapis.com/compute/v1/projects/gcloud-project"
"/zones/us-central1-c",
"https://www.googleapis.com/compute/v1/projects/gcloud-project"
"/zones/us-central1-f"
],
"quotas": [
{"metric": "CPUS", "limit": 24, "usage": 0}, # noqa
{"metric": "DISKS_TOTAL_GB", "limit": 4096, "usage": 0}, # noqa
{"metric": "STATIC_ADDRESSES", "limit": 8, "usage": 0}, # noqa
{"metric": "IN_USE_ADDRESSES", "limit": 8, "usage": 0}, # noqa
{"metric": "SSD_TOTAL_GB", "limit": 500, "usage": 0}, # noqa
{"metric": "LOCAL_SSD_TOTAL_GB", "limit": 6000, "usage": 0}, # noqa
{"metric": "INSTANCES", "limit": 24, "usage": 0}, # noqa
{"metric": "PREEMPTIBLE_CPUS", "limit": 0, "usage": 0}, # noqa
{"metric": "COMMITTED_CPUS", "limit": 0, "usage": 0}, # noqa
{"metric": "INTERNAL_ADDRESSES", "limit": 200, "usage": 0}, # noqa
# A bunch of other quotas elided for space
],
"selfLink": "https://www.googleapis.com/compute/v1/projects/"
"gcloud-project/regions/us-central1",
"kind": "compute#region"
})
def get(self, *args, **kw):
return GCloudRequest(self._get, args, kw)
def _get(self, *args, **kw):
for item in self.items:
if (kw['region'] == item['name']):
return item
# Note this isn't quite right, but at least it's the correct class
raise googleapiclient.errors.HttpError(404, b'')
class GCloudComputeEmulator:
def __init__(self):
self._instances = GCloudInstances()
self._images = GCloudImages()
self._machine_types = GCloudMachineTypes()
self._regions = GCloudRegions()
def instances(self):
return self._instances
@ -111,6 +208,12 @@ class GCloudComputeEmulator:
def images(self):
return self._images
def machineTypes(self):
return self._machine_types
def regions(self):
return self._regions
class GCloudEmulator:
def __init__(self):