From a0ac467e82a9c70218acea4fc7e2d7de1edb7f06 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 17 Mar 2021 19:46:59 -0700 Subject: [PATCH] Azure: implement support for diskimages Change-Id: Ic3089765d5ddfe45f0512f9140b0772ffef89477 --- nodepool/driver/azurestate/adapter.py | 149 ++++++++++++++++++++++---- nodepool/driver/azurestate/azul.py | 79 +++++++++++++- nodepool/driver/azurestate/config.py | 88 ++++++++++++--- nodepool/driver/statemachine.py | 82 ++++++++++++-- 4 files changed, 353 insertions(+), 45 deletions(-) diff --git a/nodepool/driver/azurestate/adapter.py b/nodepool/driver/azurestate/adapter.py index 05931b980..618b7c435 100644 --- a/nodepool/driver/azurestate/adapter.py +++ b/nodepool/driver/azurestate/adapter.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import os import math import logging import json @@ -144,11 +145,13 @@ class AzureCreateStateMachine(statemachine.StateMachine): PIP_QUERY = 'querying pip' COMPLETE = 'complete' - def __init__(self, adapter, hostname, label, metadata, retries): + def __init__(self, adapter, hostname, label, image_external_id, + metadata, retries): super().__init__() self.adapter = adapter self.retries = retries self.attempts = 0 + self.image_external_id = image_external_id self.metadata = metadata self.tags = label.tags.copy() or {} self.tags.update(metadata) @@ -209,7 +212,8 @@ class AzureCreateStateMachine(statemachine.StateMachine): self.nic = self.adapter._refresh(self.nic) if self.adapter._succeeded(self.nic): self.vm = self.adapter._createVirtualMachine( - self.label, self.tags, self.hostname, self.nic) + self.label, self.image_external_id, self.tags, + self.hostname, self.nic) self.state = self.VM_CREATING else: return @@ -288,9 +292,10 @@ class AzureAdapter(statemachine.Adapter): self.skus = {} self._getSKUs() - def getCreateStateMachine(self, hostname, label, metadata, retries): - return AzureCreateStateMachine( - self, hostname, label, metadata, retries) + def getCreateStateMachine(self, hostname, label, + image_external_id, metadata, retries): + return AzureCreateStateMachine(self, hostname, label, + image_external_id, metadata, retries) def getDeleteStateMachine(self, external_id): return AzureDeleteStateMachine(self, external_id) @@ -343,6 +348,94 @@ class AzureAdapter(statemachine.Adapter): self.provider.location)) return quota_info_from_sku(sku) + def uploadImage(self, image_name, filename, image_format, + metadata, md5, sha256): + file_sz = os.path.getsize(filename) + disk_info = { + "location": self.provider.location, + "tags": metadata, + "properties": { + "creationData": { + "createOption": "Upload", + "uploadSizeBytes": file_sz + } + } + } + self.log.debug("Creating disk for image upload") + r = self.azul.disks.create(self.resource_group, image_name, disk_info) + r = self.azul.wait_for_async_operation(r) + + if r['status'] != 'Succeeded': + raise Exception("Unable to create disk for image upload") + disk_id = r['properties']['output']['id'] + + disk_grant = { + "access": "Write", + "durationInSeconds": 24 * 60 * 60, + } + self.log.debug("Enabling write access to disk for image upload") + r = self.azul.disks.post(self.resource_group, image_name, + 'beginGetAccess', disk_grant) + r = self.azul.wait_for_async_operation(r) + + if r['status'] != 'Succeeded': + raise Exception("Unable to begin write access on disk") + sas = r['properties']['output']['accessSAS'] + + self.log.debug("Uploading image") + with open(filename, "rb") as fobj: + self.azul.upload_page_blob_to_sas_url(sas, fobj) + + disk_grant = {} + self.log.debug("Disabling write access to disk for image upload") + r = self.azul.disks.post(self.resource_group, image_name, + 'endGetAccess', disk_grant) + r = self.azul.wait_for_async_operation(r) + + if r['status'] != 'Succeeded': + raise Exception("Unable to end write access on disk") + + image_info = { + "location": self.provider.location, + "tags": metadata, + "properties": { + "hyperVGeneration": "V2", + "storageProfile": { + "osDisk": { + "osType": "Linux", + "managedDisk": { + "id": disk_id, + }, + "osState": "Generalized" + }, + "zoneResilient": True + } + } + } + self.log.debug("Creating image from disk") + r = self.azul.images.create(self.resource_group, image_name, + image_info) + r = self.azul.wait_for_async_operation(r) + + if r['status'] != 'Succeeded': + raise Exception("Unable to create image from disk") + + self.log.debug("Deleting disk for image upload") + r = self.azul.disks.delete(self.resource_group, image_name) + r = self.azul.wait_for_async_operation(r) + + if r['status'] != 'Succeeded': + raise Exception("Unable to delete disk for image upload") + + return image_name + + def deleteImage(self, external_id): + r = self.azul.images.delete(self.resource_group, external_id) + r = self.azul.wait_for_async_operation(r) + + if r['status'] != 'Succeeded': + raise Exception("Unable to delete image") + # Local implementation below def _metadataMatches(self, obj, metadata): @@ -401,6 +494,10 @@ class AzureAdapter(statemachine.Adapter): self.skus[key] = sku self.log.debug("Done querying compute SKUs") + @cachetools.func.ttl_cache(maxsize=0, ttl=(24 * 60 * 60)) + def _getImage(self, image_name): + return self.azul.images.get(self.resource_group, image_name) + @cachetools.func.ttl_cache(maxsize=1, ttl=10) def _listPublicIPAddresses(self): return self.azul.public_ip_addresses.list(self.resource_group) @@ -493,31 +590,41 @@ class AzureAdapter(statemachine.Adapter): def _listVirtualMachines(self): return self.azul.virtual_machines.list(self.resource_group) - def _createVirtualMachine(self, label, tags, hostname, nic): + def _createVirtualMachine(self, label, image_external_id, tags, + hostname, nic): + if image_external_id: + image = label.diskimage + remote_image = self._getImage(image_external_id) + image_reference = {'id': remote_image['id']} + else: + image = label.cloud_image + image_reference = label.cloud_image.image_reference + os_profile = {'computerName': hostname} + if image.username and image.key: + linux_config = { + 'ssh': { + 'publicKeys': [{ + 'path': "/home/%s/.ssh/authorized_keys" % ( + image.username), + 'keyData': image.key, + }] + }, + "disablePasswordAuthentication": True, + } + os_profile['adminUsername'] = image.username + os_profile['linuxConfiguration'] = linux_config + return self.azul.virtual_machines.create( self.resource_group, hostname, { 'location': self.provider.location, 'tags': tags, 'properties': { - 'osProfile': { - 'computerName': hostname, - 'adminUsername': label.cloud_image.username, - 'linuxConfiguration': { - 'ssh': { - 'publicKeys': [{ - 'path': "/home/%s/.ssh/authorized_keys" % ( - label.cloud_image.username), - 'keyData': label.cloud_image.key, - }] - }, - "disablePasswordAuthentication": True, - } - }, + 'osProfile': os_profile, 'hardwareProfile': { 'vmSize': label.hardware_profile["vm-size"] }, 'storageProfile': { - 'imageReference': label.cloud_image.image_reference + 'imageReference': image_reference, }, 'networkProfile': { 'networkInterfaces': [{ diff --git a/nodepool/driver/azurestate/azul.py b/nodepool/driver/azurestate/azul.py index d149618d1..14e86da62 100644 --- a/nodepool/driver/azurestate/azul.py +++ b/nodepool/driver/azurestate/azul.py @@ -12,10 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. -import requests +import concurrent.futures import logging import time +import requests + class AzureAuth(requests.auth.AuthBase): AUTH_URL = "https://login.microsoftonline.com/{tenantId}/oauth2/token" @@ -68,8 +70,12 @@ class AzureCRUD: self.args = kw.copy() self.args.update(self.cloud.credential) - def url(self, **kw): - url = (self.base_subscription_url + self.base_url + def url(self, endpoint=None, **kw): + if endpoint is None: + endpoint = '' + else: + endpoint = '/' + endpoint + url = (self.base_subscription_url + self.base_url + endpoint + '?api-version={apiVersion}') args = self.args.copy() args.update(kw) @@ -105,6 +111,10 @@ class AzureCRUD: url = self.url(**kw) return self.cloud.delete(url) + def _post(self, endpoint, params, **kw): + url = self.url(endpoint=endpoint, **kw) + return self.cloud.post(url, params) + class AzureResourceGroupsCRUD(AzureCRUD): base_url = 'resourcegroups/{resourceGroupName}' @@ -144,6 +154,11 @@ class AzureResourceProviderCRUD(AzureCRUD): return self._delete(resourceGroupName=resource_group_name, resourceName=name) + def post(self, resource_group_name, name, endpoint, params): + return self._post(endpoint, params, + resourceGroupName=resource_group_name, + resourceName=name) + class AzureNetworkCRUD(AzureCRUD): base_url = ( @@ -231,6 +246,11 @@ class AzureCloud: providerId='Microsoft.Compute', resource='disks', apiVersion='2020-06-30') + self.images = AzureResourceProviderCRUD( + self, + providerId='Microsoft.Compute', + resource='images', + apiVersion='2020-12-01') self.resource_groups = AzureResourceGroupsCRUD( self, apiVersion='2020-06-01') @@ -252,9 +272,12 @@ class AzureCloud: def get(self, url, codes=[200]): return self.request('GET', url, None, codes) - def put(self, url, data, codes=[200, 201]): + def put(self, url, data, codes=[200, 201, 202]): return self.request('PUT', url, data, codes) + def post(self, url, data, codes=[200, 202]): + return self.request('POST', url, data, codes) + def delete(self, url, codes=[200, 201, 202, 204]): return self.request('DELETE', url, None, codes) @@ -323,6 +346,52 @@ class AzureCloud: if ret['status'] == 'InProgress': continue if ret['status'] == 'Succeeded': - return + return ret raise Exception("Unhandled async operation result: %s", ret['status']) + + def _upload_chunk(self, url, start, end, data): + headers = { + 'x-ms-blob-type': 'PageBlob', + 'x-ms-page-write': 'Update', + 'Content-Length': str(len(data)), + 'Range': f'bytes={start}-{end}', + } + attempts = 10 + for x in range(attempts): + try: + requests.put(url, headers=headers, data=data).\ + raise_for_status() + break + except Exception: + if x == attempts - 1: + raise + else: + time.sleep(2 * x) + + def upload_page_blob_to_sas_url(self, url, file_object, + pagesize=(4 * 1024 * 1024), + concurrency=10): + start = 0 + futures = set() + if 'comp=page' not in url: + url += '&comp=page' + with concurrent.futures.ThreadPoolExecutor( + max_workers=concurrency) as executor: + while True: + chunk = file_object.read(pagesize) + if not chunk: + break + end = start + len(chunk) - 1 + future = executor.submit(self._upload_chunk, url, + start, end, chunk) + start += len(chunk) + futures.add(future) + # Keep the pool of work supplied with data but without + # reading the entire file into memory. + if len(futures) >= (concurrency * 2): + (done, futures) = concurrent.futures.wait( + futures, + return_when=concurrent.futures.FIRST_COMPLETED) + # We're done reading the file, wait for all uploads to finish + (done, futures) = concurrent.futures.wait(futures) diff --git a/nodepool/driver/azurestate/config.py b/nodepool/driver/azurestate/config.py index 329de1fd5..46328a797 100644 --- a/nodepool/driver/azurestate/config.py +++ b/nodepool/driver/azurestate/config.py @@ -67,6 +67,43 @@ class AzureProviderCloudImage(ConfigValue): } +class AzureProviderDiskImage(ConfigValue): + def __init__(self, image, diskimage): + default_port_mapping = { + 'ssh': 22, + 'winrm': 5986, + } + self.name = image['name'] + diskimage.image_types.add('vhd') + self.pause = bool(image.get('pause', False)) + self.python_path = image.get('python-path') + self.username = image.get('username') + self.key = image.get('key') + self.connection_type = image.get('connection-type', 'ssh') + self.connection_port = image.get( + 'connection-port', + default_port_mapping.get(self.connection_type, 22)) + self.meta = {} + + @property + def external_name(self): + '''Human readable version of external.''' + return self.name + + @staticmethod + def getSchema(): + return { + v.Required('name'): str, + 'username': str, + 'key': str, + 'pause': bool, + 'connection-type': str, + 'connection-port': int, + 'python-path': str, + # TODO(corvus): shell-type + } + + class AzureLabel(ConfigValue): ignore_equality = ['pool'] @@ -76,15 +113,31 @@ class AzureLabel(ConfigValue): self.name = label['name'] self.pool = provider_pool - cloud_image_name = label['cloud-image'] - cloud_image = provider_config.cloud_images.get( - cloud_image_name, None) - if not cloud_image: - raise ValueError( - "cloud-image %s does not exist in provider %s" - " but is referenced in label %s" % - (cloud_image_name, provider_config.name, self.name)) - self.cloud_image = cloud_image + cloud_image_name = label.get('cloud-image') + if cloud_image_name: + cloud_image = provider_config.cloud_images.get( + cloud_image_name, None) + if not cloud_image: + raise ValueError( + "cloud-image %s does not exist in provider %s" + " but is referenced in label %s" % + (cloud_image_name, provider_config.name, self.name)) + self.cloud_image = cloud_image + else: + self.cloud_image = None + + diskimage_name = label.get('diskimage') + if diskimage_name: + diskimage = provider_config.diskimages.get( + diskimage_name, None) + if not diskimage: + raise ValueError( + "diskimage %s does not exist in provider %s" + " but is referenced in label %s" % + (diskimage_name, provider_config.name, self.name)) + self.diskimage = diskimage + else: + self.diskimage = None self.hardware_profile = label['hardware-profile'] self.tags = label.get('tags', {}) @@ -97,7 +150,8 @@ class AzureLabel(ConfigValue): return { v.Required('name'): str, - v.Required('cloud-image'): str, + 'cloud-image': str, + 'diskimage': str, v.Required('hardware-profile'): azure_hardware_profile, 'tags': dict, } @@ -160,19 +214,23 @@ class AzureProviderConfig(ProviderConfig): @property def manage_images(self): - return False + return True @staticmethod def reset(): pass def load(self, config): + self.image_type = 'vhd' + self.image_name_format = '{image_name}-{timestamp}' + self.post_upload_hook = self.provider.get('post-upload-hook') + self.rate_limit = self.provider.get('rate-limit', 1) self.launch_retries = self.provider.get('launch-retries', 3) self.boot_timeout = self.provider.get('boot-timeout', 60) # TODO(corvus): remove - self.zuul_public_key = self.provider['zuul-public-key'] + self.zuul_public_key = self.provider.get('zuul-public-key') self.location = self.provider['location'] self.subnet_id = self.provider.get('subnet-id') self.network = self.provider.get('network') @@ -194,6 +252,12 @@ class AzureProviderConfig(ProviderConfig): i = AzureProviderCloudImage(image, self.zuul_public_key) self.cloud_images[i.name] = i + self.diskimages = {} + for image in self.provider['diskimages']: + diskimage = config.diskimages[image['name']] + i = AzureProviderDiskImage(image, diskimage) + self.diskimages[i.name] = i + for pool in self.provider.get('pools', []): pp = AzurePool(self, pool) self._pools[pp.name] = pp diff --git a/nodepool/driver/statemachine.py b/nodepool/driver/statemachine.py index 942ace609..67892155f 100644 --- a/nodepool/driver/statemachine.py +++ b/nodepool/driver/statemachine.py @@ -89,13 +89,44 @@ class StateMachineNodeLauncher(stats.StatsReporter): def launch(self): label = self.handler.pool.labels[self.node.type[0]] + if label.diskimage: + diskimage = self.provider_config.diskimages[ + label.diskimage.name] + cloud_image = self.zk.getMostRecentImageUpload( + diskimage.name, self.provider_config.name) + + if not cloud_image: + raise exceptions.LaunchNodepoolException( + "Unable to find current cloud image %s in %s" % + (diskimage.name, self.provider_config.name) + ) + + image_external_id = cloud_image.external_id + self.node.image_id = "{path}/{upload_id}".format( + path=self.zk._imageUploadPath( + cloud_image.image_name, + cloud_image.build_id, + cloud_image.provider_name), + upload_id=cloud_image.id) + image = diskimage + else: + image_external_id = None + self.node.image_id = label.cloud_image.name + image = label.cloud_image + + self.node.username = image.username + self.node.python_path = image.python_path + self.node.connection_port = image.connection_port + self.node.connection_type = image.connection_type + self.zk.storeNode(self.node) + hostname = 'nodepool-' + self.node.id retries = self.manager.provider.launch_retries metadata = {'nodepool_node_id': self.node.id, 'nodepool_pool_name': self.handler.pool.name, 'nodepool_provider_name': self.manager.provider.name} self.state_machine = self.manager.adapter.getCreateStateMachine( - hostname, label, metadata, retries) + hostname, label, image_external_id, metadata, retries) def updateNodeFromInstance(self, instance): if instance is None: @@ -103,7 +134,6 @@ class StateMachineNodeLauncher(stats.StatsReporter): node = self.node pool = self.handler.pool - label = pool.labels[self.node.type[0]] if (pool.use_internal_ip and (instance.private_ipv4 or instance.private_ipv6)): @@ -118,10 +148,6 @@ class StateMachineNodeLauncher(stats.StatsReporter): node.public_ipv6 = instance.public_ipv6 node.region = instance.region node.az = instance.az - node.username = label.cloud_image.username - node.python_path = label.cloud_image.python_path - node.connection_port = label.cloud_image.connection_port - node.connection_type = label.cloud_image.connection_type self.zk.storeNode(node) def runStateMachine(self): @@ -556,6 +582,18 @@ class StateMachineProvider(Provider, QuotaSupport): metadata = {'nodepool_provider_name': self.provider.name} self.adapter.cleanupLeakedResources(known_nodes, metadata) + # Image handling + + def uploadImage(self, image_name, filename, image_type=None, meta=None, + md5=None, sha256=None): + return self.adapter.uploadImage(image_name, filename, + image_format=image_type, + metadata=meta, md5=md5, + sha256=sha256) + + def deleteImage(self, name, id): + return self.adapter.deleteImage(external_id=id) + # Driver implementation @@ -676,7 +714,8 @@ class Adapter: def __init__(self, provider_config): pass - def getCreateStateMachine(self, hostname, label, metadata, retries): + def getCreateStateMachine(self, hostname, label, + image_external_id, metadata, retries): """Return a state machine suitable for creating an instance This method should return a new state machine object @@ -685,6 +724,9 @@ class Adapter: :param str hostname: The hostname of the node. :param ProviderLabel label: A config object representing the provider-label for the node. + :param str image_external_id: If provided, the external id of + a previously uploaded image; if None, then the adapter should + look up a cloud image based on the label. :param metadata dict: A dictionary of metadata that must be stored on the instance in the cloud. The same data must be able to be returned later on :py:class:`Instance` objects @@ -767,3 +809,29 @@ class Adapter: :returns: A :py:class:`QuotaInformation` object. """ return QuotaInformation(instances=1) + + # The following methods must be implemented only if image + # management is supported: + + def uploadImage(self, image_name, filename, image_format=None, + metadata=None, md5=None, sha256=None): + """Upload the image to the cloud + + :param image_name str: The name of the image + :param filename str: The path to the local file to be uploaded + :param image_format str: The format of the image (e.g., "qcow") + :param metadata dict: A dictionary of metadata that must be + stored on the image in the cloud. + :param md5 str: The md5 hash of the image file + :param sha256 str: The sha256 hash of the image file + + :return: The external id of the image in the cloud + """ + raise NotImplementedError() + + def deleteImage(self, external_id): + """Delete an image from the cloud + + :param external_id str: The external id of the image to delete + """ + raise NotImplementedError()