Azure: implement support for diskimages

Change-Id: Ic3089765d5ddfe45f0512f9140b0772ffef89477
This commit is contained in:
James E. Blair 2021-03-17 19:46:59 -07:00
parent e09350a1ce
commit a0ac467e82
4 changed files with 353 additions and 45 deletions

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import math import math
import logging import logging
import json import json
@ -144,11 +145,13 @@ class AzureCreateStateMachine(statemachine.StateMachine):
PIP_QUERY = 'querying pip' PIP_QUERY = 'querying pip'
COMPLETE = 'complete' COMPLETE = 'complete'
def __init__(self, adapter, hostname, label, metadata, retries): def __init__(self, adapter, hostname, label, image_external_id,
metadata, retries):
super().__init__() super().__init__()
self.adapter = adapter self.adapter = adapter
self.retries = retries self.retries = retries
self.attempts = 0 self.attempts = 0
self.image_external_id = image_external_id
self.metadata = metadata self.metadata = metadata
self.tags = label.tags.copy() or {} self.tags = label.tags.copy() or {}
self.tags.update(metadata) self.tags.update(metadata)
@ -209,7 +212,8 @@ class AzureCreateStateMachine(statemachine.StateMachine):
self.nic = self.adapter._refresh(self.nic) self.nic = self.adapter._refresh(self.nic)
if self.adapter._succeeded(self.nic): if self.adapter._succeeded(self.nic):
self.vm = self.adapter._createVirtualMachine( self.vm = self.adapter._createVirtualMachine(
self.label, self.tags, self.hostname, self.nic) self.label, self.image_external_id, self.tags,
self.hostname, self.nic)
self.state = self.VM_CREATING self.state = self.VM_CREATING
else: else:
return return
@ -288,9 +292,10 @@ class AzureAdapter(statemachine.Adapter):
self.skus = {} self.skus = {}
self._getSKUs() self._getSKUs()
def getCreateStateMachine(self, hostname, label, metadata, retries): def getCreateStateMachine(self, hostname, label,
return AzureCreateStateMachine( image_external_id, metadata, retries):
self, hostname, label, metadata, retries) return AzureCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries)
def getDeleteStateMachine(self, external_id): def getDeleteStateMachine(self, external_id):
return AzureDeleteStateMachine(self, external_id) return AzureDeleteStateMachine(self, external_id)
@ -343,6 +348,94 @@ class AzureAdapter(statemachine.Adapter):
self.provider.location)) self.provider.location))
return quota_info_from_sku(sku) return quota_info_from_sku(sku)
def uploadImage(self, image_name, filename, image_format,
metadata, md5, sha256):
file_sz = os.path.getsize(filename)
disk_info = {
"location": self.provider.location,
"tags": metadata,
"properties": {
"creationData": {
"createOption": "Upload",
"uploadSizeBytes": file_sz
}
}
}
self.log.debug("Creating disk for image upload")
r = self.azul.disks.create(self.resource_group, image_name, disk_info)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to create disk for image upload")
disk_id = r['properties']['output']['id']
disk_grant = {
"access": "Write",
"durationInSeconds": 24 * 60 * 60,
}
self.log.debug("Enabling write access to disk for image upload")
r = self.azul.disks.post(self.resource_group, image_name,
'beginGetAccess', disk_grant)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to begin write access on disk")
sas = r['properties']['output']['accessSAS']
self.log.debug("Uploading image")
with open(filename, "rb") as fobj:
self.azul.upload_page_blob_to_sas_url(sas, fobj)
disk_grant = {}
self.log.debug("Disabling write access to disk for image upload")
r = self.azul.disks.post(self.resource_group, image_name,
'endGetAccess', disk_grant)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to end write access on disk")
image_info = {
"location": self.provider.location,
"tags": metadata,
"properties": {
"hyperVGeneration": "V2",
"storageProfile": {
"osDisk": {
"osType": "Linux",
"managedDisk": {
"id": disk_id,
},
"osState": "Generalized"
},
"zoneResilient": True
}
}
}
self.log.debug("Creating image from disk")
r = self.azul.images.create(self.resource_group, image_name,
image_info)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to create image from disk")
self.log.debug("Deleting disk for image upload")
r = self.azul.disks.delete(self.resource_group, image_name)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to delete disk for image upload")
return image_name
def deleteImage(self, external_id):
r = self.azul.images.delete(self.resource_group, external_id)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to delete image")
# Local implementation below # Local implementation below
def _metadataMatches(self, obj, metadata): def _metadataMatches(self, obj, metadata):
@ -401,6 +494,10 @@ class AzureAdapter(statemachine.Adapter):
self.skus[key] = sku self.skus[key] = sku
self.log.debug("Done querying compute SKUs") self.log.debug("Done querying compute SKUs")
@cachetools.func.ttl_cache(maxsize=0, ttl=(24 * 60 * 60))
def _getImage(self, image_name):
return self.azul.images.get(self.resource_group, image_name)
@cachetools.func.ttl_cache(maxsize=1, ttl=10) @cachetools.func.ttl_cache(maxsize=1, ttl=10)
def _listPublicIPAddresses(self): def _listPublicIPAddresses(self):
return self.azul.public_ip_addresses.list(self.resource_group) return self.azul.public_ip_addresses.list(self.resource_group)
@ -493,31 +590,41 @@ class AzureAdapter(statemachine.Adapter):
def _listVirtualMachines(self): def _listVirtualMachines(self):
return self.azul.virtual_machines.list(self.resource_group) return self.azul.virtual_machines.list(self.resource_group)
def _createVirtualMachine(self, label, tags, hostname, nic): def _createVirtualMachine(self, label, image_external_id, tags,
hostname, nic):
if image_external_id:
image = label.diskimage
remote_image = self._getImage(image_external_id)
image_reference = {'id': remote_image['id']}
else:
image = label.cloud_image
image_reference = label.cloud_image.image_reference
os_profile = {'computerName': hostname}
if image.username and image.key:
linux_config = {
'ssh': {
'publicKeys': [{
'path': "/home/%s/.ssh/authorized_keys" % (
image.username),
'keyData': image.key,
}]
},
"disablePasswordAuthentication": True,
}
os_profile['adminUsername'] = image.username
os_profile['linuxConfiguration'] = linux_config
return self.azul.virtual_machines.create( return self.azul.virtual_machines.create(
self.resource_group, hostname, { self.resource_group, hostname, {
'location': self.provider.location, 'location': self.provider.location,
'tags': tags, 'tags': tags,
'properties': { 'properties': {
'osProfile': { 'osProfile': os_profile,
'computerName': hostname,
'adminUsername': label.cloud_image.username,
'linuxConfiguration': {
'ssh': {
'publicKeys': [{
'path': "/home/%s/.ssh/authorized_keys" % (
label.cloud_image.username),
'keyData': label.cloud_image.key,
}]
},
"disablePasswordAuthentication": True,
}
},
'hardwareProfile': { 'hardwareProfile': {
'vmSize': label.hardware_profile["vm-size"] 'vmSize': label.hardware_profile["vm-size"]
}, },
'storageProfile': { 'storageProfile': {
'imageReference': label.cloud_image.image_reference 'imageReference': image_reference,
}, },
'networkProfile': { 'networkProfile': {
'networkInterfaces': [{ 'networkInterfaces': [{

View File

@ -12,10 +12,12 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import requests import concurrent.futures
import logging import logging
import time import time
import requests
class AzureAuth(requests.auth.AuthBase): class AzureAuth(requests.auth.AuthBase):
AUTH_URL = "https://login.microsoftonline.com/{tenantId}/oauth2/token" AUTH_URL = "https://login.microsoftonline.com/{tenantId}/oauth2/token"
@ -68,8 +70,12 @@ class AzureCRUD:
self.args = kw.copy() self.args = kw.copy()
self.args.update(self.cloud.credential) self.args.update(self.cloud.credential)
def url(self, **kw): def url(self, endpoint=None, **kw):
url = (self.base_subscription_url + self.base_url if endpoint is None:
endpoint = ''
else:
endpoint = '/' + endpoint
url = (self.base_subscription_url + self.base_url + endpoint
+ '?api-version={apiVersion}') + '?api-version={apiVersion}')
args = self.args.copy() args = self.args.copy()
args.update(kw) args.update(kw)
@ -105,6 +111,10 @@ class AzureCRUD:
url = self.url(**kw) url = self.url(**kw)
return self.cloud.delete(url) return self.cloud.delete(url)
def _post(self, endpoint, params, **kw):
url = self.url(endpoint=endpoint, **kw)
return self.cloud.post(url, params)
class AzureResourceGroupsCRUD(AzureCRUD): class AzureResourceGroupsCRUD(AzureCRUD):
base_url = 'resourcegroups/{resourceGroupName}' base_url = 'resourcegroups/{resourceGroupName}'
@ -144,6 +154,11 @@ class AzureResourceProviderCRUD(AzureCRUD):
return self._delete(resourceGroupName=resource_group_name, return self._delete(resourceGroupName=resource_group_name,
resourceName=name) resourceName=name)
def post(self, resource_group_name, name, endpoint, params):
return self._post(endpoint, params,
resourceGroupName=resource_group_name,
resourceName=name)
class AzureNetworkCRUD(AzureCRUD): class AzureNetworkCRUD(AzureCRUD):
base_url = ( base_url = (
@ -231,6 +246,11 @@ class AzureCloud:
providerId='Microsoft.Compute', providerId='Microsoft.Compute',
resource='disks', resource='disks',
apiVersion='2020-06-30') apiVersion='2020-06-30')
self.images = AzureResourceProviderCRUD(
self,
providerId='Microsoft.Compute',
resource='images',
apiVersion='2020-12-01')
self.resource_groups = AzureResourceGroupsCRUD( self.resource_groups = AzureResourceGroupsCRUD(
self, self,
apiVersion='2020-06-01') apiVersion='2020-06-01')
@ -252,9 +272,12 @@ class AzureCloud:
def get(self, url, codes=[200]): def get(self, url, codes=[200]):
return self.request('GET', url, None, codes) return self.request('GET', url, None, codes)
def put(self, url, data, codes=[200, 201]): def put(self, url, data, codes=[200, 201, 202]):
return self.request('PUT', url, data, codes) return self.request('PUT', url, data, codes)
def post(self, url, data, codes=[200, 202]):
return self.request('POST', url, data, codes)
def delete(self, url, codes=[200, 201, 202, 204]): def delete(self, url, codes=[200, 201, 202, 204]):
return self.request('DELETE', url, None, codes) return self.request('DELETE', url, None, codes)
@ -323,6 +346,52 @@ class AzureCloud:
if ret['status'] == 'InProgress': if ret['status'] == 'InProgress':
continue continue
if ret['status'] == 'Succeeded': if ret['status'] == 'Succeeded':
return return ret
raise Exception("Unhandled async operation result: %s", raise Exception("Unhandled async operation result: %s",
ret['status']) ret['status'])
def _upload_chunk(self, url, start, end, data):
headers = {
'x-ms-blob-type': 'PageBlob',
'x-ms-page-write': 'Update',
'Content-Length': str(len(data)),
'Range': f'bytes={start}-{end}',
}
attempts = 10
for x in range(attempts):
try:
requests.put(url, headers=headers, data=data).\
raise_for_status()
break
except Exception:
if x == attempts - 1:
raise
else:
time.sleep(2 * x)
def upload_page_blob_to_sas_url(self, url, file_object,
pagesize=(4 * 1024 * 1024),
concurrency=10):
start = 0
futures = set()
if 'comp=page' not in url:
url += '&comp=page'
with concurrent.futures.ThreadPoolExecutor(
max_workers=concurrency) as executor:
while True:
chunk = file_object.read(pagesize)
if not chunk:
break
end = start + len(chunk) - 1
future = executor.submit(self._upload_chunk, url,
start, end, chunk)
start += len(chunk)
futures.add(future)
# Keep the pool of work supplied with data but without
# reading the entire file into memory.
if len(futures) >= (concurrency * 2):
(done, futures) = concurrent.futures.wait(
futures,
return_when=concurrent.futures.FIRST_COMPLETED)
# We're done reading the file, wait for all uploads to finish
(done, futures) = concurrent.futures.wait(futures)

View File

@ -67,6 +67,43 @@ class AzureProviderCloudImage(ConfigValue):
} }
class AzureProviderDiskImage(ConfigValue):
def __init__(self, image, diskimage):
default_port_mapping = {
'ssh': 22,
'winrm': 5986,
}
self.name = image['name']
diskimage.image_types.add('vhd')
self.pause = bool(image.get('pause', False))
self.python_path = image.get('python-path')
self.username = image.get('username')
self.key = image.get('key')
self.connection_type = image.get('connection-type', 'ssh')
self.connection_port = image.get(
'connection-port',
default_port_mapping.get(self.connection_type, 22))
self.meta = {}
@property
def external_name(self):
'''Human readable version of external.'''
return self.name
@staticmethod
def getSchema():
return {
v.Required('name'): str,
'username': str,
'key': str,
'pause': bool,
'connection-type': str,
'connection-port': int,
'python-path': str,
# TODO(corvus): shell-type
}
class AzureLabel(ConfigValue): class AzureLabel(ConfigValue):
ignore_equality = ['pool'] ignore_equality = ['pool']
@ -76,15 +113,31 @@ class AzureLabel(ConfigValue):
self.name = label['name'] self.name = label['name']
self.pool = provider_pool self.pool = provider_pool
cloud_image_name = label['cloud-image'] cloud_image_name = label.get('cloud-image')
cloud_image = provider_config.cloud_images.get( if cloud_image_name:
cloud_image_name, None) cloud_image = provider_config.cloud_images.get(
if not cloud_image: cloud_image_name, None)
raise ValueError( if not cloud_image:
"cloud-image %s does not exist in provider %s" raise ValueError(
" but is referenced in label %s" % "cloud-image %s does not exist in provider %s"
(cloud_image_name, provider_config.name, self.name)) " but is referenced in label %s" %
self.cloud_image = cloud_image (cloud_image_name, provider_config.name, self.name))
self.cloud_image = cloud_image
else:
self.cloud_image = None
diskimage_name = label.get('diskimage')
if diskimage_name:
diskimage = provider_config.diskimages.get(
diskimage_name, None)
if not diskimage:
raise ValueError(
"diskimage %s does not exist in provider %s"
" but is referenced in label %s" %
(diskimage_name, provider_config.name, self.name))
self.diskimage = diskimage
else:
self.diskimage = None
self.hardware_profile = label['hardware-profile'] self.hardware_profile = label['hardware-profile']
self.tags = label.get('tags', {}) self.tags = label.get('tags', {})
@ -97,7 +150,8 @@ class AzureLabel(ConfigValue):
return { return {
v.Required('name'): str, v.Required('name'): str,
v.Required('cloud-image'): str, 'cloud-image': str,
'diskimage': str,
v.Required('hardware-profile'): azure_hardware_profile, v.Required('hardware-profile'): azure_hardware_profile,
'tags': dict, 'tags': dict,
} }
@ -160,19 +214,23 @@ class AzureProviderConfig(ProviderConfig):
@property @property
def manage_images(self): def manage_images(self):
return False return True
@staticmethod @staticmethod
def reset(): def reset():
pass pass
def load(self, config): def load(self, config):
self.image_type = 'vhd'
self.image_name_format = '{image_name}-{timestamp}'
self.post_upload_hook = self.provider.get('post-upload-hook')
self.rate_limit = self.provider.get('rate-limit', 1) self.rate_limit = self.provider.get('rate-limit', 1)
self.launch_retries = self.provider.get('launch-retries', 3) self.launch_retries = self.provider.get('launch-retries', 3)
self.boot_timeout = self.provider.get('boot-timeout', 60) self.boot_timeout = self.provider.get('boot-timeout', 60)
# TODO(corvus): remove # TODO(corvus): remove
self.zuul_public_key = self.provider['zuul-public-key'] self.zuul_public_key = self.provider.get('zuul-public-key')
self.location = self.provider['location'] self.location = self.provider['location']
self.subnet_id = self.provider.get('subnet-id') self.subnet_id = self.provider.get('subnet-id')
self.network = self.provider.get('network') self.network = self.provider.get('network')
@ -194,6 +252,12 @@ class AzureProviderConfig(ProviderConfig):
i = AzureProviderCloudImage(image, self.zuul_public_key) i = AzureProviderCloudImage(image, self.zuul_public_key)
self.cloud_images[i.name] = i self.cloud_images[i.name] = i
self.diskimages = {}
for image in self.provider['diskimages']:
diskimage = config.diskimages[image['name']]
i = AzureProviderDiskImage(image, diskimage)
self.diskimages[i.name] = i
for pool in self.provider.get('pools', []): for pool in self.provider.get('pools', []):
pp = AzurePool(self, pool) pp = AzurePool(self, pool)
self._pools[pp.name] = pp self._pools[pp.name] = pp

View File

@ -89,13 +89,44 @@ class StateMachineNodeLauncher(stats.StatsReporter):
def launch(self): def launch(self):
label = self.handler.pool.labels[self.node.type[0]] label = self.handler.pool.labels[self.node.type[0]]
if label.diskimage:
diskimage = self.provider_config.diskimages[
label.diskimage.name]
cloud_image = self.zk.getMostRecentImageUpload(
diskimage.name, self.provider_config.name)
if not cloud_image:
raise exceptions.LaunchNodepoolException(
"Unable to find current cloud image %s in %s" %
(diskimage.name, self.provider_config.name)
)
image_external_id = cloud_image.external_id
self.node.image_id = "{path}/{upload_id}".format(
path=self.zk._imageUploadPath(
cloud_image.image_name,
cloud_image.build_id,
cloud_image.provider_name),
upload_id=cloud_image.id)
image = diskimage
else:
image_external_id = None
self.node.image_id = label.cloud_image.name
image = label.cloud_image
self.node.username = image.username
self.node.python_path = image.python_path
self.node.connection_port = image.connection_port
self.node.connection_type = image.connection_type
self.zk.storeNode(self.node)
hostname = 'nodepool-' + self.node.id hostname = 'nodepool-' + self.node.id
retries = self.manager.provider.launch_retries retries = self.manager.provider.launch_retries
metadata = {'nodepool_node_id': self.node.id, metadata = {'nodepool_node_id': self.node.id,
'nodepool_pool_name': self.handler.pool.name, 'nodepool_pool_name': self.handler.pool.name,
'nodepool_provider_name': self.manager.provider.name} 'nodepool_provider_name': self.manager.provider.name}
self.state_machine = self.manager.adapter.getCreateStateMachine( self.state_machine = self.manager.adapter.getCreateStateMachine(
hostname, label, metadata, retries) hostname, label, image_external_id, metadata, retries)
def updateNodeFromInstance(self, instance): def updateNodeFromInstance(self, instance):
if instance is None: if instance is None:
@ -103,7 +134,6 @@ class StateMachineNodeLauncher(stats.StatsReporter):
node = self.node node = self.node
pool = self.handler.pool pool = self.handler.pool
label = pool.labels[self.node.type[0]]
if (pool.use_internal_ip and if (pool.use_internal_ip and
(instance.private_ipv4 or instance.private_ipv6)): (instance.private_ipv4 or instance.private_ipv6)):
@ -118,10 +148,6 @@ class StateMachineNodeLauncher(stats.StatsReporter):
node.public_ipv6 = instance.public_ipv6 node.public_ipv6 = instance.public_ipv6
node.region = instance.region node.region = instance.region
node.az = instance.az node.az = instance.az
node.username = label.cloud_image.username
node.python_path = label.cloud_image.python_path
node.connection_port = label.cloud_image.connection_port
node.connection_type = label.cloud_image.connection_type
self.zk.storeNode(node) self.zk.storeNode(node)
def runStateMachine(self): def runStateMachine(self):
@ -556,6 +582,18 @@ class StateMachineProvider(Provider, QuotaSupport):
metadata = {'nodepool_provider_name': self.provider.name} metadata = {'nodepool_provider_name': self.provider.name}
self.adapter.cleanupLeakedResources(known_nodes, metadata) self.adapter.cleanupLeakedResources(known_nodes, metadata)
# Image handling
def uploadImage(self, image_name, filename, image_type=None, meta=None,
md5=None, sha256=None):
return self.adapter.uploadImage(image_name, filename,
image_format=image_type,
metadata=meta, md5=md5,
sha256=sha256)
def deleteImage(self, name, id):
return self.adapter.deleteImage(external_id=id)
# Driver implementation # Driver implementation
@ -676,7 +714,8 @@ class Adapter:
def __init__(self, provider_config): def __init__(self, provider_config):
pass pass
def getCreateStateMachine(self, hostname, label, metadata, retries): def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries):
"""Return a state machine suitable for creating an instance """Return a state machine suitable for creating an instance
This method should return a new state machine object This method should return a new state machine object
@ -685,6 +724,9 @@ class Adapter:
:param str hostname: The hostname of the node. :param str hostname: The hostname of the node.
:param ProviderLabel label: A config object representing the :param ProviderLabel label: A config object representing the
provider-label for the node. provider-label for the node.
:param str image_external_id: If provided, the external id of
a previously uploaded image; if None, then the adapter should
look up a cloud image based on the label.
:param metadata dict: A dictionary of metadata that must be :param metadata dict: A dictionary of metadata that must be
stored on the instance in the cloud. The same data must be stored on the instance in the cloud. The same data must be
able to be returned later on :py:class:`Instance` objects able to be returned later on :py:class:`Instance` objects
@ -767,3 +809,29 @@ class Adapter:
:returns: A :py:class:`QuotaInformation` object. :returns: A :py:class:`QuotaInformation` object.
""" """
return QuotaInformation(instances=1) return QuotaInformation(instances=1)
# The following methods must be implemented only if image
# management is supported:
def uploadImage(self, image_name, filename, image_format=None,
metadata=None, md5=None, sha256=None):
"""Upload the image to the cloud
:param image_name str: The name of the image
:param filename str: The path to the local file to be uploaded
:param image_format str: The format of the image (e.g., "qcow")
:param metadata dict: A dictionary of metadata that must be
stored on the image in the cloud.
:param md5 str: The md5 hash of the image file
:param sha256 str: The sha256 hash of the image file
:return: The external id of the image in the cloud
"""
raise NotImplementedError()
def deleteImage(self, external_id):
"""Delete an image from the cloud
:param external_id str: The external id of the image to delete
"""
raise NotImplementedError()