Merge "Azure: implement support for diskimages"

This commit is contained in:
Zuul 2021-06-21 21:17:22 +00:00 committed by Gerrit Code Review
commit e0cd1a79e2
4 changed files with 353 additions and 45 deletions

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import math
import logging
import json
@ -144,11 +145,13 @@ class AzureCreateStateMachine(statemachine.StateMachine):
PIP_QUERY = 'querying pip'
COMPLETE = 'complete'
def __init__(self, adapter, hostname, label, metadata, retries):
def __init__(self, adapter, hostname, label, image_external_id,
metadata, retries):
super().__init__()
self.adapter = adapter
self.retries = retries
self.attempts = 0
self.image_external_id = image_external_id
self.metadata = metadata
self.tags = label.tags.copy() or {}
self.tags.update(metadata)
@ -209,7 +212,8 @@ class AzureCreateStateMachine(statemachine.StateMachine):
self.nic = self.adapter._refresh(self.nic)
if self.adapter._succeeded(self.nic):
self.vm = self.adapter._createVirtualMachine(
self.label, self.tags, self.hostname, self.nic)
self.label, self.image_external_id, self.tags,
self.hostname, self.nic)
self.state = self.VM_CREATING
else:
return
@ -288,9 +292,10 @@ class AzureAdapter(statemachine.Adapter):
self.skus = {}
self._getSKUs()
def getCreateStateMachine(self, hostname, label, metadata, retries):
return AzureCreateStateMachine(
self, hostname, label, metadata, retries)
def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries):
return AzureCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries)
def getDeleteStateMachine(self, external_id):
return AzureDeleteStateMachine(self, external_id)
@ -343,6 +348,94 @@ class AzureAdapter(statemachine.Adapter):
self.provider.location))
return quota_info_from_sku(sku)
def uploadImage(self, image_name, filename, image_format,
metadata, md5, sha256):
file_sz = os.path.getsize(filename)
disk_info = {
"location": self.provider.location,
"tags": metadata,
"properties": {
"creationData": {
"createOption": "Upload",
"uploadSizeBytes": file_sz
}
}
}
self.log.debug("Creating disk for image upload")
r = self.azul.disks.create(self.resource_group, image_name, disk_info)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to create disk for image upload")
disk_id = r['properties']['output']['id']
disk_grant = {
"access": "Write",
"durationInSeconds": 24 * 60 * 60,
}
self.log.debug("Enabling write access to disk for image upload")
r = self.azul.disks.post(self.resource_group, image_name,
'beginGetAccess', disk_grant)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to begin write access on disk")
sas = r['properties']['output']['accessSAS']
self.log.debug("Uploading image")
with open(filename, "rb") as fobj:
self.azul.upload_page_blob_to_sas_url(sas, fobj)
disk_grant = {}
self.log.debug("Disabling write access to disk for image upload")
r = self.azul.disks.post(self.resource_group, image_name,
'endGetAccess', disk_grant)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to end write access on disk")
image_info = {
"location": self.provider.location,
"tags": metadata,
"properties": {
"hyperVGeneration": "V2",
"storageProfile": {
"osDisk": {
"osType": "Linux",
"managedDisk": {
"id": disk_id,
},
"osState": "Generalized"
},
"zoneResilient": True
}
}
}
self.log.debug("Creating image from disk")
r = self.azul.images.create(self.resource_group, image_name,
image_info)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to create image from disk")
self.log.debug("Deleting disk for image upload")
r = self.azul.disks.delete(self.resource_group, image_name)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to delete disk for image upload")
return image_name
def deleteImage(self, external_id):
r = self.azul.images.delete(self.resource_group, external_id)
r = self.azul.wait_for_async_operation(r)
if r['status'] != 'Succeeded':
raise Exception("Unable to delete image")
# Local implementation below
def _metadataMatches(self, obj, metadata):
@ -401,6 +494,10 @@ class AzureAdapter(statemachine.Adapter):
self.skus[key] = sku
self.log.debug("Done querying compute SKUs")
@cachetools.func.ttl_cache(maxsize=0, ttl=(24 * 60 * 60))
def _getImage(self, image_name):
return self.azul.images.get(self.resource_group, image_name)
@cachetools.func.ttl_cache(maxsize=1, ttl=10)
def _listPublicIPAddresses(self):
return self.azul.public_ip_addresses.list(self.resource_group)
@ -493,31 +590,41 @@ class AzureAdapter(statemachine.Adapter):
def _listVirtualMachines(self):
return self.azul.virtual_machines.list(self.resource_group)
def _createVirtualMachine(self, label, tags, hostname, nic):
def _createVirtualMachine(self, label, image_external_id, tags,
hostname, nic):
if image_external_id:
image = label.diskimage
remote_image = self._getImage(image_external_id)
image_reference = {'id': remote_image['id']}
else:
image = label.cloud_image
image_reference = label.cloud_image.image_reference
os_profile = {'computerName': hostname}
if image.username and image.key:
linux_config = {
'ssh': {
'publicKeys': [{
'path': "/home/%s/.ssh/authorized_keys" % (
image.username),
'keyData': image.key,
}]
},
"disablePasswordAuthentication": True,
}
os_profile['adminUsername'] = image.username
os_profile['linuxConfiguration'] = linux_config
return self.azul.virtual_machines.create(
self.resource_group, hostname, {
'location': self.provider.location,
'tags': tags,
'properties': {
'osProfile': {
'computerName': hostname,
'adminUsername': label.cloud_image.username,
'linuxConfiguration': {
'ssh': {
'publicKeys': [{
'path': "/home/%s/.ssh/authorized_keys" % (
label.cloud_image.username),
'keyData': label.cloud_image.key,
}]
},
"disablePasswordAuthentication": True,
}
},
'osProfile': os_profile,
'hardwareProfile': {
'vmSize': label.hardware_profile["vm-size"]
},
'storageProfile': {
'imageReference': label.cloud_image.image_reference
'imageReference': image_reference,
},
'networkProfile': {
'networkInterfaces': [{

View File

@ -12,10 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import requests
import concurrent.futures
import logging
import time
import requests
class AzureAuth(requests.auth.AuthBase):
AUTH_URL = "https://login.microsoftonline.com/{tenantId}/oauth2/token"
@ -68,8 +70,12 @@ class AzureCRUD:
self.args = kw.copy()
self.args.update(self.cloud.credential)
def url(self, **kw):
url = (self.base_subscription_url + self.base_url
def url(self, endpoint=None, **kw):
if endpoint is None:
endpoint = ''
else:
endpoint = '/' + endpoint
url = (self.base_subscription_url + self.base_url + endpoint
+ '?api-version={apiVersion}')
args = self.args.copy()
args.update(kw)
@ -105,6 +111,10 @@ class AzureCRUD:
url = self.url(**kw)
return self.cloud.delete(url)
def _post(self, endpoint, params, **kw):
url = self.url(endpoint=endpoint, **kw)
return self.cloud.post(url, params)
class AzureResourceGroupsCRUD(AzureCRUD):
base_url = 'resourcegroups/{resourceGroupName}'
@ -144,6 +154,11 @@ class AzureResourceProviderCRUD(AzureCRUD):
return self._delete(resourceGroupName=resource_group_name,
resourceName=name)
def post(self, resource_group_name, name, endpoint, params):
return self._post(endpoint, params,
resourceGroupName=resource_group_name,
resourceName=name)
class AzureNetworkCRUD(AzureCRUD):
base_url = (
@ -231,6 +246,11 @@ class AzureCloud:
providerId='Microsoft.Compute',
resource='disks',
apiVersion='2020-06-30')
self.images = AzureResourceProviderCRUD(
self,
providerId='Microsoft.Compute',
resource='images',
apiVersion='2020-12-01')
self.resource_groups = AzureResourceGroupsCRUD(
self,
apiVersion='2020-06-01')
@ -252,9 +272,12 @@ class AzureCloud:
def get(self, url, codes=[200]):
return self.request('GET', url, None, codes)
def put(self, url, data, codes=[200, 201]):
def put(self, url, data, codes=[200, 201, 202]):
return self.request('PUT', url, data, codes)
def post(self, url, data, codes=[200, 202]):
return self.request('POST', url, data, codes)
def delete(self, url, codes=[200, 201, 202, 204]):
return self.request('DELETE', url, None, codes)
@ -323,6 +346,52 @@ class AzureCloud:
if ret['status'] == 'InProgress':
continue
if ret['status'] == 'Succeeded':
return
return ret
raise Exception("Unhandled async operation result: %s",
ret['status'])
def _upload_chunk(self, url, start, end, data):
headers = {
'x-ms-blob-type': 'PageBlob',
'x-ms-page-write': 'Update',
'Content-Length': str(len(data)),
'Range': f'bytes={start}-{end}',
}
attempts = 10
for x in range(attempts):
try:
requests.put(url, headers=headers, data=data).\
raise_for_status()
break
except Exception:
if x == attempts - 1:
raise
else:
time.sleep(2 * x)
def upload_page_blob_to_sas_url(self, url, file_object,
pagesize=(4 * 1024 * 1024),
concurrency=10):
start = 0
futures = set()
if 'comp=page' not in url:
url += '&comp=page'
with concurrent.futures.ThreadPoolExecutor(
max_workers=concurrency) as executor:
while True:
chunk = file_object.read(pagesize)
if not chunk:
break
end = start + len(chunk) - 1
future = executor.submit(self._upload_chunk, url,
start, end, chunk)
start += len(chunk)
futures.add(future)
# Keep the pool of work supplied with data but without
# reading the entire file into memory.
if len(futures) >= (concurrency * 2):
(done, futures) = concurrent.futures.wait(
futures,
return_when=concurrent.futures.FIRST_COMPLETED)
# We're done reading the file, wait for all uploads to finish
(done, futures) = concurrent.futures.wait(futures)

View File

@ -67,6 +67,43 @@ class AzureProviderCloudImage(ConfigValue):
}
class AzureProviderDiskImage(ConfigValue):
def __init__(self, image, diskimage):
default_port_mapping = {
'ssh': 22,
'winrm': 5986,
}
self.name = image['name']
diskimage.image_types.add('vhd')
self.pause = bool(image.get('pause', False))
self.python_path = image.get('python-path')
self.username = image.get('username')
self.key = image.get('key')
self.connection_type = image.get('connection-type', 'ssh')
self.connection_port = image.get(
'connection-port',
default_port_mapping.get(self.connection_type, 22))
self.meta = {}
@property
def external_name(self):
'''Human readable version of external.'''
return self.name
@staticmethod
def getSchema():
return {
v.Required('name'): str,
'username': str,
'key': str,
'pause': bool,
'connection-type': str,
'connection-port': int,
'python-path': str,
# TODO(corvus): shell-type
}
class AzureLabel(ConfigValue):
ignore_equality = ['pool']
@ -76,15 +113,31 @@ class AzureLabel(ConfigValue):
self.name = label['name']
self.pool = provider_pool
cloud_image_name = label['cloud-image']
cloud_image = provider_config.cloud_images.get(
cloud_image_name, None)
if not cloud_image:
raise ValueError(
"cloud-image %s does not exist in provider %s"
" but is referenced in label %s" %
(cloud_image_name, provider_config.name, self.name))
self.cloud_image = cloud_image
cloud_image_name = label.get('cloud-image')
if cloud_image_name:
cloud_image = provider_config.cloud_images.get(
cloud_image_name, None)
if not cloud_image:
raise ValueError(
"cloud-image %s does not exist in provider %s"
" but is referenced in label %s" %
(cloud_image_name, provider_config.name, self.name))
self.cloud_image = cloud_image
else:
self.cloud_image = None
diskimage_name = label.get('diskimage')
if diskimage_name:
diskimage = provider_config.diskimages.get(
diskimage_name, None)
if not diskimage:
raise ValueError(
"diskimage %s does not exist in provider %s"
" but is referenced in label %s" %
(diskimage_name, provider_config.name, self.name))
self.diskimage = diskimage
else:
self.diskimage = None
self.hardware_profile = label['hardware-profile']
self.tags = label.get('tags', {})
@ -97,7 +150,8 @@ class AzureLabel(ConfigValue):
return {
v.Required('name'): str,
v.Required('cloud-image'): str,
'cloud-image': str,
'diskimage': str,
v.Required('hardware-profile'): azure_hardware_profile,
'tags': dict,
}
@ -160,19 +214,23 @@ class AzureProviderConfig(ProviderConfig):
@property
def manage_images(self):
return False
return True
@staticmethod
def reset():
pass
def load(self, config):
self.image_type = 'vhd'
self.image_name_format = '{image_name}-{timestamp}'
self.post_upload_hook = self.provider.get('post-upload-hook')
self.rate_limit = self.provider.get('rate-limit', 1)
self.launch_retries = self.provider.get('launch-retries', 3)
self.boot_timeout = self.provider.get('boot-timeout', 60)
# TODO(corvus): remove
self.zuul_public_key = self.provider['zuul-public-key']
self.zuul_public_key = self.provider.get('zuul-public-key')
self.location = self.provider['location']
self.subnet_id = self.provider.get('subnet-id')
self.network = self.provider.get('network')
@ -194,6 +252,12 @@ class AzureProviderConfig(ProviderConfig):
i = AzureProviderCloudImage(image, self.zuul_public_key)
self.cloud_images[i.name] = i
self.diskimages = {}
for image in self.provider['diskimages']:
diskimage = config.diskimages[image['name']]
i = AzureProviderDiskImage(image, diskimage)
self.diskimages[i.name] = i
for pool in self.provider.get('pools', []):
pp = AzurePool(self, pool)
self._pools[pp.name] = pp

View File

@ -89,13 +89,44 @@ class StateMachineNodeLauncher(stats.StatsReporter):
def launch(self):
label = self.handler.pool.labels[self.node.type[0]]
if label.diskimage:
diskimage = self.provider_config.diskimages[
label.diskimage.name]
cloud_image = self.zk.getMostRecentImageUpload(
diskimage.name, self.provider_config.name)
if not cloud_image:
raise exceptions.LaunchNodepoolException(
"Unable to find current cloud image %s in %s" %
(diskimage.name, self.provider_config.name)
)
image_external_id = cloud_image.external_id
self.node.image_id = "{path}/{upload_id}".format(
path=self.zk._imageUploadPath(
cloud_image.image_name,
cloud_image.build_id,
cloud_image.provider_name),
upload_id=cloud_image.id)
image = diskimage
else:
image_external_id = None
self.node.image_id = label.cloud_image.name
image = label.cloud_image
self.node.username = image.username
self.node.python_path = image.python_path
self.node.connection_port = image.connection_port
self.node.connection_type = image.connection_type
self.zk.storeNode(self.node)
hostname = 'nodepool-' + self.node.id
retries = self.manager.provider.launch_retries
metadata = {'nodepool_node_id': self.node.id,
'nodepool_pool_name': self.handler.pool.name,
'nodepool_provider_name': self.manager.provider.name}
self.state_machine = self.manager.adapter.getCreateStateMachine(
hostname, label, metadata, retries)
hostname, label, image_external_id, metadata, retries)
def updateNodeFromInstance(self, instance):
if instance is None:
@ -103,7 +134,6 @@ class StateMachineNodeLauncher(stats.StatsReporter):
node = self.node
pool = self.handler.pool
label = pool.labels[self.node.type[0]]
if (pool.use_internal_ip and
(instance.private_ipv4 or instance.private_ipv6)):
@ -118,10 +148,6 @@ class StateMachineNodeLauncher(stats.StatsReporter):
node.public_ipv6 = instance.public_ipv6
node.region = instance.region
node.az = instance.az
node.username = label.cloud_image.username
node.python_path = label.cloud_image.python_path
node.connection_port = label.cloud_image.connection_port
node.connection_type = label.cloud_image.connection_type
self.zk.storeNode(node)
def runStateMachine(self):
@ -556,6 +582,18 @@ class StateMachineProvider(Provider, QuotaSupport):
metadata = {'nodepool_provider_name': self.provider.name}
self.adapter.cleanupLeakedResources(known_nodes, metadata)
# Image handling
def uploadImage(self, image_name, filename, image_type=None, meta=None,
md5=None, sha256=None):
return self.adapter.uploadImage(image_name, filename,
image_format=image_type,
metadata=meta, md5=md5,
sha256=sha256)
def deleteImage(self, name, id):
return self.adapter.deleteImage(external_id=id)
# Driver implementation
@ -676,7 +714,8 @@ class Adapter:
def __init__(self, provider_config):
pass
def getCreateStateMachine(self, hostname, label, metadata, retries):
def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries):
"""Return a state machine suitable for creating an instance
This method should return a new state machine object
@ -685,6 +724,9 @@ class Adapter:
:param str hostname: The hostname of the node.
:param ProviderLabel label: A config object representing the
provider-label for the node.
:param str image_external_id: If provided, the external id of
a previously uploaded image; if None, then the adapter should
look up a cloud image based on the label.
:param metadata dict: A dictionary of metadata that must be
stored on the instance in the cloud. The same data must be
able to be returned later on :py:class:`Instance` objects
@ -767,3 +809,29 @@ class Adapter:
:returns: A :py:class:`QuotaInformation` object.
"""
return QuotaInformation(instances=1)
# The following methods must be implemented only if image
# management is supported:
def uploadImage(self, image_name, filename, image_format=None,
metadata=None, md5=None, sha256=None):
"""Upload the image to the cloud
:param image_name str: The name of the image
:param filename str: The path to the local file to be uploaded
:param image_format str: The format of the image (e.g., "qcow")
:param metadata dict: A dictionary of metadata that must be
stored on the image in the cloud.
:param md5 str: The md5 hash of the image file
:param sha256 str: The sha256 hash of the image file
:return: The external id of the image in the cloud
"""
raise NotImplementedError()
def deleteImage(self, external_id):
"""Delete an image from the cloud
:param external_id str: The external id of the image to delete
"""
raise NotImplementedError()