Merge "Azure: replace driver with state machine driver"

This commit is contained in:
Zuul 2021-06-25 21:57:03 +00:00 committed by Gerrit Code Review
commit 738c4e50c5
14 changed files with 767 additions and 1526 deletions

View File

@ -1,27 +1,25 @@
# Copyright 2018 Red Hat
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nodepool.driver import Driver
from nodepool.driver.statemachine import StateMachineDriver
from nodepool.driver.azure.config import AzureProviderConfig
from nodepool.driver.azure.provider import AzureProvider
from nodepool.driver.azure.adapter import AzureAdapter
class AzureDriver(Driver):
class AzureDriver(StateMachineDriver):
def getProviderConfig(self, provider):
return AzureProviderConfig(self, provider)
def getProvider(self, provider_config):
return AzureProvider(provider_config)
def getAdapter(self, provider_config):
return AzureAdapter(provider_config)

View File

@ -12,10 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import requests
import concurrent.futures
import logging
import time
import requests
class AzureAuth(requests.auth.AuthBase):
AUTH_URL = "https://login.microsoftonline.com/{tenantId}/oauth2/token"
@ -48,109 +50,159 @@ class AzureAuth(requests.auth.AuthBase):
class AzureError(Exception):
def __init__(self, status_code, message):
def __init__(self, status_code, error_code, message):
super().__init__(message)
self.error_code = error_code
self.status_code = status_code
class AzureNotFoundError(AzureError):
def __init__(self, status_code, message):
super().__init__(status_code, message)
class AzureResourceGroupsCRUD:
def __init__(self, cloud, version):
self.cloud = cloud
self.version = version
def url(self, url, **args):
base_url = (
'https://management.azure.com/subscriptions/{subscriptionId}'
'/resourcegroups/')
url = base_url + url + '?api-version={apiVersion}'
args = args.copy()
args.update(self.cloud.credential)
args['apiVersion'] = self.version
return url.format(**args)
def list(self):
url = self.url('')
return self.cloud.paginate(self.cloud.get(url))
def get(self, name):
url = self.url(name)
return self.cloud.get(url)
def create(self, name, params):
url = self.url(name)
return self.cloud.put(url, params)
def delete(self, name):
url = self.url(name)
return self.cloud.delete(url)
pass
class AzureCRUD:
def __init__(self, cloud, resource, version):
self.cloud = cloud
self.resource = resource
self.version = version
base_subscription_url = (
'https://management.azure.com/subscriptions/{subscriptionId}/')
base_url = ''
def url(self, url, **args):
base_url = (
'https://management.azure.com/subscriptions/{subscriptionId}'
'/resourceGroups/{resourceGroupName}/providers/')
url = base_url + url + '?api-version={apiVersion}'
args = args.copy()
args.update(self.cloud.credential)
args['apiVersion'] = self.version
def __init__(self, cloud, **kw):
self.cloud = cloud
self.args = kw.copy()
self.args.update(self.cloud.credential)
def url(self, endpoint=None, **kw):
if endpoint is None:
endpoint = ''
else:
endpoint = '/' + endpoint
url = (self.base_subscription_url + self.base_url + endpoint
+ '?api-version={apiVersion}')
args = self.args.copy()
args.update(kw)
return url.format(**args)
def id_url(self, url, **args):
def id_url(self, url, **kw):
base_url = 'https://management.azure.com'
url = base_url + url + '?api-version={apiVersion}'
args = args.copy()
args['apiVersion'] = self.version
args = self.args.copy()
args.update(kw)
return url.format(**args)
def list(self, resource_group_name):
url = self.url(
self.resource,
resourceGroupName=resource_group_name,
)
return self.cloud.paginate(self.cloud.get(url))
def get_by_id(self, resource_id):
url = self.id_url(resource_id)
return self.cloud.get(url)
def get(self, resource_group_name, name):
url = self.url(
'{_resource}/{_resourceName}',
_resource=self.resource,
_resourceName=name,
resourceGroupName=resource_group_name,
)
def _list(self, **kw):
url = self.url(**kw)
return self.cloud.paginate(self.cloud.get(url))
def list(self):
return self._list()
def _get(self, **kw):
url = self.url(**kw)
return self.cloud.get(url)
def create(self, resource_group_name, name, params):
url = self.url(
'{_resource}/{_resourceName}',
_resource=self.resource,
_resourceName=name,
resourceGroupName=resource_group_name,
)
def _create(self, params, **kw):
url = self.url(**kw)
return self.cloud.put(url, params)
def delete(self, resource_group_name, name):
url = self.url(
'{_resource}/{_resourceName}',
_resource=self.resource,
_resourceName=name,
resourceGroupName=resource_group_name,
)
def _delete(self, **kw):
url = self.url(**kw)
return self.cloud.delete(url)
def _post(self, endpoint, params, **kw):
url = self.url(endpoint=endpoint, **kw)
return self.cloud.post(url, params)
class AzureResourceGroupsCRUD(AzureCRUD):
base_url = 'resourcegroups/{resourceGroupName}'
def list(self):
return self._list(resourceGroupName='')
def get(self, name):
return self._get(resourceGroupName=name)
def create(self, name, params):
return self._create(params, resourceGroupName=name)
def delete(self, name):
return self._delete(resourceGroupName=name)
class AzureResourceProviderCRUD(AzureCRUD):
base_url = (
'resourceGroups/{resourceGroupName}/providers/'
'{providerId}/{resource}/{resourceName}')
def list(self, resource_group_name):
return self._list(resourceGroupName=resource_group_name,
resourceName='')
def get(self, resource_group_name, name):
return self._get(resourceGroupName=resource_group_name,
resourceName=name)
def create(self, resource_group_name, name, params):
return self._create(params,
resourceGroupName=resource_group_name,
resourceName=name)
def delete(self, resource_group_name, name):
return self._delete(resourceGroupName=resource_group_name,
resourceName=name)
def post(self, resource_group_name, name, endpoint, params):
return self._post(endpoint, params,
resourceGroupName=resource_group_name,
resourceName=name)
class AzureNetworkCRUD(AzureCRUD):
base_url = (
'resourceGroups/{resourceGroupName}/providers/'
'Microsoft.Network/virtualNetworks/{virtualNetworkName}/'
'{resource}/{resourceName}')
def list(self, resource_group_name, virtual_network_name):
return self._list(resourceGroupName=resource_group_name,
virtualNetworkName=virtual_network_name,
resourceName='')
def get(self, resource_group_name, virtual_network_name, name):
return self._get(resourceGroupName=resource_group_name,
virtualNetworkName=virtual_network_name,
resourceName=name)
def create(self, resource_group_name, virtual_network_name, name, params):
return self._create(params,
resourceGroupName=resource_group_name,
virtualNetworkName=virtual_network_name,
resourceName=name)
def delete(self, resource_group_name, virtual_network_name, name):
return self._delete(resourceGroupName=resource_group_name,
virtualNetworkName=virtual_network_name,
resourceName=name)
class AzureLocationCRUD(AzureCRUD):
base_url = (
'providers/{providerId}/locations/{location}/{resource}')
def list(self, location):
return self._list(location=location)
class AzureProviderCRUD(AzureCRUD):
base_url = (
'providers/{providerId}/{resource}/')
def list(self):
return self._list()
class AzureDictResponse(dict):
def __init__(self, response, *args):
@ -174,32 +226,58 @@ class AzureCloud:
self.session = requests.Session()
self.log = logging.getLogger("azul")
self.auth = AzureAuth(credential)
self.network_interfaces = AzureCRUD(
self.network_interfaces = AzureResourceProviderCRUD(
self,
'Microsoft.Network/networkInterfaces',
'2020-07-01')
self.public_ip_addresses = AzureCRUD(
providerId='Microsoft.Network',
resource='networkInterfaces',
apiVersion='2020-07-01')
self.public_ip_addresses = AzureResourceProviderCRUD(
self,
'Microsoft.Network/publicIPAddresses',
'2020-07-01')
self.virtual_machines = AzureCRUD(
providerId='Microsoft.Network',
resource='publicIPAddresses',
apiVersion='2020-07-01')
self.virtual_machines = AzureResourceProviderCRUD(
self,
'Microsoft.Compute/virtualMachines',
'2020-12-01')
self.disks = AzureCRUD(
providerId='Microsoft.Compute',
resource='virtualMachines',
apiVersion='2020-12-01')
self.disks = AzureResourceProviderCRUD(
self,
'Microsoft.Compute/disks',
'2020-06-30')
providerId='Microsoft.Compute',
resource='disks',
apiVersion='2020-06-30')
self.images = AzureResourceProviderCRUD(
self,
providerId='Microsoft.Compute',
resource='images',
apiVersion='2020-12-01')
self.resource_groups = AzureResourceGroupsCRUD(
self,
'2020-06-01')
apiVersion='2020-06-01')
self.subnets = AzureNetworkCRUD(
self,
resource='subnets',
apiVersion='2020-07-01')
self.compute_usages = AzureLocationCRUD(
self,
providerId='Microsoft.Compute',
resource='usages',
apiVersion='2020-12-01')
self.compute_skus = AzureProviderCRUD(
self,
providerId='Microsoft.Compute',
resource='skus',
apiVersion='2019-04-01')
def get(self, url, codes=[200]):
return self.request('GET', url, None, codes)
def put(self, url, data, codes=[200, 201]):
def put(self, url, data, codes=[200, 201, 202]):
return self.request('PUT', url, data, codes)
def post(self, url, data, codes=[200, 202]):
return self.request('POST', url, data, codes)
def delete(self, url, codes=[200, 201, 202, 204]):
return self.request('DELETE', url, None, codes)
@ -226,9 +304,13 @@ class AzureCloud:
self.log.error(response.text)
if response.status_code == 404:
raise AzureNotFoundError(
response.status_code, err['error']['message'])
response.status_code,
err['error']['code'],
err['error']['message'])
else:
raise AzureError(response.status_code, err['error']['message'])
raise AzureError(response.status_code,
err['error']['code'],
err['error']['message'])
def paginate(self, data):
ret = data['value']
@ -264,6 +346,52 @@ class AzureCloud:
if ret['status'] == 'InProgress':
continue
if ret['status'] == 'Succeeded':
return
return ret
raise Exception("Unhandled async operation result: %s",
ret['status'])
def _upload_chunk(self, url, start, end, data):
headers = {
'x-ms-blob-type': 'PageBlob',
'x-ms-page-write': 'Update',
'Content-Length': str(len(data)),
'Range': f'bytes={start}-{end}',
}
attempts = 10
for x in range(attempts):
try:
requests.put(url, headers=headers, data=data).\
raise_for_status()
break
except Exception:
if x == attempts - 1:
raise
else:
time.sleep(2 * x)
def upload_page_blob_to_sas_url(self, url, file_object,
pagesize=(4 * 1024 * 1024),
concurrency=10):
start = 0
futures = set()
if 'comp=page' not in url:
url += '&comp=page'
with concurrent.futures.ThreadPoolExecutor(
max_workers=concurrency) as executor:
while True:
chunk = file_object.read(pagesize)
if not chunk:
break
end = start + len(chunk) - 1
future = executor.submit(self._upload_chunk, url,
start, end, chunk)
start += len(chunk)
futures.add(future)
# Keep the pool of work supplied with data but without
# reading the entire file into memory.
if len(futures) >= (concurrency * 2):
(done, futures) = concurrent.futures.wait(
futures,
return_when=concurrent.futures.FIRST_COMPLETED)
# We're done reading the file, wait for all uploads to finish
(done, futures) = concurrent.futures.wait(futures)

View File

@ -1,4 +1,5 @@
# Copyright 2018 Red Hat
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -23,142 +24,29 @@ from nodepool.driver import ProviderConfig
class AzureProviderCloudImage(ConfigValue):
def __init__(self):
self.name = None
self.image_id = None
self.username = None
self.key = None
self.python_path = None
self.connection_type = None
self.connection_port = None
def __eq__(self, other):
if isinstance(other, AzureProviderCloudImage):
return (self.name == other.name
and self.image_id == other.image_id
and self.username == other.username
and self.key == other.key
and self.python_path == other.python_path
and self.connection_type == other.connection_type
and self.connection_port == other.connection_port)
return False
def __repr__(self):
return "<AzureProviderCloudImage %s>" % self.name
def __init__(self, image, zuul_public_key):
default_port_mapping = {
'ssh': 22,
'winrm': 5986,
}
self.name = image['name']
self.username = image['username']
# TODO(corvus): remove zuul_public_key
self.key = image.get('key', zuul_public_key)
self.image_reference = image['image-reference']
self.python_path = image.get('python-path')
self.connection_type = image.get('connection-type', 'ssh')
self.connection_port = image.get(
'connection-port',
default_port_mapping.get(self.connection_type, 22))
@property
def external_name(self):
'''Human readable version of external.'''
return self.image_id or self.name
class AzureLabel(ConfigValue):
def __eq__(self, other):
if (other.cloud_image != self.cloud_image or
other.hardware_profile != self.hardware_profile):
return False
return True
class AzurePool(ConfigPool):
def __eq__(self, other):
if other.labels != self.labels:
return False
return True
def __repr__(self):
return "<AzurePool %s>" % self.name
def load(self, pool_config):
pass
class AzureProviderConfig(ProviderConfig):
def __init__(self, driver, provider):
self._pools = {}
self.driver_object = driver
super().__init__(provider)
def __eq__(self, other):
if (other.location != self.location or
other.pools != self.pools):
return False
return True
@property
def pools(self):
return self._pools
@property
def manage_images(self):
return False
@staticmethod
def reset():
pass
def load(self, config):
default_port_mapping = {
'ssh': 22,
'winrm': 5986,
}
self.zuul_public_key = self.provider['zuul-public-key']
self.location = self.provider['location']
self.subnet_id = self.provider['subnet-id']
self.ipv6 = self.provider.get('ipv6', False)
self.resource_group = self.provider['resource-group']
self.resource_group_location = self.provider['resource-group-location']
self.auth_path = self.provider.get(
'auth-path', os.getenv('AZURE_AUTH_LOCATION', None))
self.cloud_images = {}
for image in self.provider['cloud-images']:
i = AzureProviderCloudImage()
i.name = image['name']
i.username = image['username']
i.key = image.get('key', self.zuul_public_key)
i.image_reference = image['image-reference']
i.connection_type = image.get('connection-type', 'ssh')
i.connection_port = image.get(
'connection-port',
default_port_mapping.get(i.connection_type, 22))
self.cloud_images[i.name] = i
for pool in self.provider.get('pools', []):
pp = AzurePool()
pp.name = pool['name']
pp.provider = self
pp.max_servers = pool['max-servers']
self._pools[pp.name] = pp
pp.labels = {}
for label in pool.get('labels', []):
pl = AzureLabel()
pl.name = label['name']
pl.pool = pp
pp.labels[pl.name] = pl
cloud_image_name = label['cloud-image']
if cloud_image_name:
cloud_image = self.cloud_images.get(
cloud_image_name, None)
if not cloud_image:
raise ValueError(
"cloud-image %s does not exist in provider %s"
" but is referenced in label %s" %
(cloud_image_name, self.name, pl.name))
pl.cloud_image = cloud_image
else:
pl.cloud_image = None
pl.hardware_profile = label['hardware-profile']
config.labels[label['name']].pools.append(pp)
pl.tags = label['tags']
def getSchema(self):
def getSchema():
azure_image_reference = {
v.Required('sku'): str,
v.Required('publisher'): str,
@ -166,38 +54,249 @@ class AzureProviderConfig(ProviderConfig):
v.Required('offer'): str,
}
return {
v.Required('name'): str,
v.Required('username'): str,
# TODO(corvus): make required when zuul_public_key removed
'key': str,
v.Required('image-reference'): azure_image_reference,
'connection-type': str,
'connection-port': int,
'python-path': str,
# TODO(corvus): shell-type
}
class AzureProviderDiskImage(ConfigValue):
def __init__(self, image, diskimage):
default_port_mapping = {
'ssh': 22,
'winrm': 5986,
}
self.name = image['name']
diskimage.image_types.add('vhd')
self.pause = bool(image.get('pause', False))
self.python_path = image.get('python-path')
self.username = image.get('username')
self.key = image.get('key')
self.connection_type = image.get('connection-type', 'ssh')
self.connection_port = image.get(
'connection-port',
default_port_mapping.get(self.connection_type, 22))
self.meta = {}
@property
def external_name(self):
'''Human readable version of external.'''
return self.name
@staticmethod
def getSchema():
return {
v.Required('name'): str,
'username': str,
'key': str,
'pause': bool,
'connection-type': str,
'connection-port': int,
'python-path': str,
# TODO(corvus): shell-type
}
class AzureLabel(ConfigValue):
ignore_equality = ['pool']
def __init__(self, label, provider_config, provider_pool):
self.hardware_profile = None
self.name = label['name']
self.pool = provider_pool
cloud_image_name = label.get('cloud-image')
if cloud_image_name:
cloud_image = provider_config.cloud_images.get(
cloud_image_name, None)
if not cloud_image:
raise ValueError(
"cloud-image %s does not exist in provider %s"
" but is referenced in label %s" %
(cloud_image_name, provider_config.name, self.name))
self.cloud_image = cloud_image
else:
self.cloud_image = None
diskimage_name = label.get('diskimage')
if diskimage_name:
diskimage = provider_config.diskimages.get(
diskimage_name, None)
if not diskimage:
raise ValueError(
"diskimage %s does not exist in provider %s"
" but is referenced in label %s" %
(diskimage_name, provider_config.name, self.name))
self.diskimage = diskimage
else:
self.diskimage = None
self.hardware_profile = label['hardware-profile']
self.tags = label.get('tags', {})
@staticmethod
def getSchema():
azure_hardware_profile = {
v.Required('vm-size'): str,
}
provider_cloud_images = {
return {
v.Required('name'): str,
'username': str,
v.Required('image-reference'): azure_image_reference,
'cloud-image': str,
'diskimage': str,
v.Required('hardware-profile'): azure_hardware_profile,
'tags': dict,
}
azure_label = {
v.Required('name'): str,
v.Required('hardware-profile'): azure_hardware_profile,
v.Required('cloud-image'): str,
v.Optional('tags'): dict,
}
class AzurePool(ConfigPool):
ignore_equality = ['provider']
def __init__(self, provider_config, pool_config):
super().__init__()
self.provider = provider_config
self.load(pool_config)
def load(self, pool_config):
self.name = pool_config['name']
self.max_servers = pool_config['max-servers']
self.public_ipv4 = pool_config.get('public-ipv4',
self.provider.public_ipv4)
self.public_ipv6 = pool_config.get('public-ipv6',
self.provider.public_ipv6)
self.ipv4 = pool_config.get('ipv4', self.provider.ipv4)
self.ipv6 = pool_config.get('ipv6', self.provider.ipv6)
self.ipv4 = self.ipv4 or self.public_ipv4
self.ipv6 = self.ipv6 or self.public_ipv6
if not (self.ipv4 or self.ipv6):
self.ipv4 = True
self.use_internal_ip = pool_config.get(
'use-internal-ip', self.provider.use_internal_ip)
self.host_key_checking = pool_config.get(
'host-key-checking', self.provider.use_internal_ip)
@staticmethod
def getSchema():
azure_label = AzureLabel.getSchema()
pool = ConfigPool.getCommonSchemaDict()
pool.update({
v.Required('name'): str,
v.Required('labels'): [azure_label],
'ipv4': bool,
'ipv6': bool,
'public-ipv4': bool,
'public-ipv6': bool,
'use-internal-ip': bool,
'host-key-checking': bool,
})
return pool
class AzureProviderConfig(ProviderConfig):
def __init__(self, driver, provider):
super().__init__(provider)
self._pools = {}
self.rate_limit = None
self.launch_retries = None
@property
def pools(self):
return self._pools
@property
def manage_images(self):
return True
@staticmethod
def reset():
pass
def load(self, config):
self.image_type = 'vhd'
self.image_name_format = '{image_name}-{timestamp}'
self.post_upload_hook = self.provider.get('post-upload-hook')
self.rate_limit = self.provider.get('rate-limit', 1)
self.launch_retries = self.provider.get('launch-retries', 3)
self.boot_timeout = self.provider.get('boot-timeout', 60)
# TODO(corvus): remove
self.zuul_public_key = self.provider.get('zuul-public-key')
self.location = self.provider['location']
self.subnet_id = self.provider.get('subnet-id')
self.network = self.provider.get('network')
# Don't use these directly; these are default values for
# labels.
# TODO: change default to false
self.public_ipv4 = self.provider.get('public-ipv4', True)
self.public_ipv6 = self.provider.get('public-ipv6', False)
self.ipv4 = self.provider.get('ipv4', None)
self.ipv6 = self.provider.get('ipv6', None)
self.use_internal_ip = self.provider.get('use-internal-ip', False)
self.host_key_checking = self.provider.get('host-key-checking', True)
self.resource_group = self.provider['resource-group']
self.resource_group_location = self.provider['resource-group-location']
self.auth_path = self.provider.get(
'auth-path', os.getenv('AZURE_AUTH_LOCATION', None))
self.cloud_images = {}
for image in self.provider.get('cloud-images', []):
i = AzureProviderCloudImage(image, self.zuul_public_key)
self.cloud_images[i.name] = i
self.diskimages = {}
for image in self.provider.get('diskimages', []):
diskimage = config.diskimages[image['name']]
i = AzureProviderDiskImage(image, diskimage)
self.diskimages[i.name] = i
for pool in self.provider.get('pools', []):
pp = AzurePool(self, pool)
self._pools[pp.name] = pp
for label in pool.get('labels', []):
pl = AzureLabel(label, self, pp)
pp.labels[pl.name] = pl
config.labels[pl.name].pools.append(pp)
def getSchema(self):
provider_cloud_images = AzureProviderCloudImage.getSchema()
provider_diskimages = AzureProviderDiskImage.getSchema()
pool = AzurePool.getSchema()
provider = ProviderConfig.getCommonSchemaDict()
provider.update({
v.Required('zuul-public-key'): str,
v.Required('pools'): [pool],
v.Required('location'): str,
v.Required('resource-group'): str,
v.Required('resource-group-location'): str,
v.Required('subnet-id'): str,
v.Required('cloud-images'): [provider_cloud_images],
v.Optional('auth-path'): str,
'subnet-id': str,
'network': v.Any(str, {
'resource-group': str,
'network': str,
'subnet': str,
}),
'cloud-images': [provider_cloud_images],
'diskimages': [provider_diskimages],
v.Required('auth-path'): str,
'ipv4': bool,
'ipv6': bool,
'public-ipv4': bool,
'public-ipv6': bool,
'use-internal-ip': bool,
'host-key-checking': bool,
# TODO: remove
'zuul-public-key': str,
})
return v.Schema(provider)

View File

@ -1,147 +0,0 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
from nodepool import exceptions
from nodepool import zk
from nodepool.driver.utils import NodeLauncher
from nodepool.driver import NodeRequestHandler
from nodepool import nodeutils as utils
class AzureInstanceLauncher(NodeLauncher):
def __init__(
self, handler, node, provider_config,
label, retries=3, boot_timeout=120):
super().__init__(handler, node, provider_config)
self.retries = retries
self.handler = handler
self.label = label
self.boot_timeout = boot_timeout
self.zk = handler.zk
def launch(self):
self.log.debug("Starting %s instance" % self.node.type)
attempts = 1
hostname = '{label.name}-{provider.name}-{node.id}'.format(
label=self.label, provider=self.provider_config, node=self.node
)
while attempts <= self.retries:
try:
instance = self.handler.manager.createInstance(
hostname, self.label, self.node.id,
nodepool_node_label=self.label.name)
break
except Exception:
if attempts <= self.retries:
self.log.exception(
"Launch attempt %d/%d failed for node %s:",
attempts, self.retries, self.node.id)
if attempts == self.retries:
raise
attempts += 1
time.sleep(1)
self.node.external_id = hostname
boot_start = time.monotonic()
while time.monotonic() - boot_start < self.boot_timeout:
state = instance['properties']['provisioningState']
self.log.debug("Instance %s is %s" % (hostname, state))
if state == 'Succeeded':
break
time.sleep(0.5)
instance = self.handler.manager.getInstance(hostname)
if state != 'Succeeded':
raise exceptions.LaunchStatusException(
"Instance %s failed to start: %s" % (hostname, state))
server_ip = self.handler.manager.getIpaddress(instance)
if self.provider_config.ipv6:
server_v6_ip = self.handler.manager.getv6Ipaddress(instance)
if not server_ip:
raise exceptions.LaunchStatusException(
"Instance %s doesn't have a public ip" % hostname)
try:
key = utils.nodescan(server_ip, port=22, timeout=180)
except Exception:
raise exceptions.LaunchKeyscanException(
"Can't scan instance %s key" % hostname)
self.log.info("Instance %s ready" % hostname)
self.node.state = zk.READY
self.node.hostname = server_ip
self.node.interface_ip = server_ip
self.node.public_ipv4 = server_ip
if self.provider_config.ipv6:
self.node.public_ipv6 = server_v6_ip
self.node.host_keys = key
self.node.connection_port = 22
self.node.connection_type = "ssh"
self.node.username = self.label.cloud_image.username
self.zk.storeNode(self.node)
self.log.info("Instance %s is ready", hostname)
class AzureNodeRequestHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.azure."
"AzureNodeRequestHandler")
def __init__(self, pw, request):
super().__init__(pw, request)
self._threads = []
@property
def alive_thread_count(self):
count = 0
for t in self._threads:
if t.is_alive():
count += 1
return count
def launchesComplete(self):
'''
Check if all launch requests have completed.
When all of the Node objects have reached a final state (READY or
FAILED), we'll know all threads have finished the launch process.
'''
if not self._threads:
return True
# Give the NodeLaunch threads time to finish.
if self.alive_thread_count:
return False
node_states = [node.state for node in self.nodeset]
# NOTE: It very important that NodeLauncher always sets one of
# these states, no matter what.
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
return False
return True
def launch(self, node):
label = self.pool.labels[node.type[0]]
thd = AzureInstanceLauncher(self, node, self.provider, label)
thd.start()
self._threads.append(thd)
def imagesAvailable(self):
return True

View File

@ -1,390 +0,0 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import json
from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.azure import handler
from nodepool import zk
from . import azul
class AzureProvider(Provider):
log = logging.getLogger("nodepool.driver.azure.AzureProvider")
def __init__(self, provider, *args):
self.provider = provider
self.zuul_public_key = provider.zuul_public_key
self.resource_group = provider.resource_group
self.resource_group_location = provider.resource_group_location
self._zk = None
def start(self, zk_conn):
self.log.debug("Starting")
self._zk = zk_conn
self.log.debug(
"Using %s as auth_path for Azure auth" % self.provider.auth_path)
with open(self.provider.auth_path) as f:
self.azul = azul.AzureCloud(json.load(f))
def stop(self):
self.log.debug("Stopping")
def listNodes(self):
return self.azul.virtual_machines.list(self.resource_group)
def listNICs(self):
return self.azul.network_interfaces.list(self.resource_group)
def listPIPs(self):
return self.azul.public_ip_addresses.list(self.resource_group)
def listDisks(self):
return self.azul.disks.list(self.resource_group)
def labelReady(self, name):
return True
def join(self):
return True
def getRequestHandler(self, poolworker, request):
return handler.AzureNodeRequestHandler(poolworker, request)
def cleanupLeakedResources(self):
self._cleanupLeakedNodes()
self._cleanupLeakedNICs()
self._cleanupLeakedPIPs()
self._cleanupLeakedDisks()
def _cleanupLeakedDisks(self):
for disk in self.listDisks():
if disk['tags'] is None:
# Nothing to check ownership against, move on
continue
if 'nodepool_provider_name' not in disk['tags']:
continue
if disk['tags']['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if not self._zk.getNode(disk['tags']['nodepool_id']):
self.log.warning(
"Marking for delete leaked Disk %s (%s) in %s "
"(unknown node id %s)",
disk['name'], disk['id'], self.provider.name,
disk['tags']['nodepool_id']
)
try:
self.azul.wait_for_async_operation(
self.azul.disks.delete(
self.resource_group,
disk['name']))
except azul.AzureError as e:
self.log.warning(
"Failed to cleanup Disk %s (%s). Error: %r",
disk['name'], disk['id'], e
)
def _cleanupLeakedNICs(self):
for nic in self.listNICs():
if nic['tags'] is None:
# Nothing to check ownership against, move on
continue
if 'nodepool_provider_name' not in nic['tags']:
continue
if nic['tags']['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if not self._zk.getNode(nic['tags']['nodepool_id']):
self.log.warning(
"Marking for delete leaked NIC %s (%s) in %s "
"(unknown node id %s)",
nic['name'], nic['id'], self.provider.name,
nic['tags']['nodepool_id']
)
try:
self.azul.wait_for_async_operation(
self.azul.network_interfaces.delete(
self.resource_group,
nic['name']))
except azul.AzureError as e:
self.log.warning(
"Failed to cleanup NIC %s (%s). Error: %r",
nic['name'], nic['id'], e
)
def _cleanupLeakedPIPs(self):
for pip in self.listPIPs():
if pip['tags'] is None:
# Nothing to check ownership against, move on
continue
if 'nodepool_provider_name' not in pip['tags']:
continue
if pip['tags']['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if not self._zk.getNode(pip['tags']['nodepool_id']):
self.log.warning(
"Marking for delete leaked PIP %s (%s) in %s "
"(unknown node id %s)",
pip['name'], pip['id'], self.provider.name,
pip['tags']['nodepool_id']
)
try:
self.azul.wait_for_async_operation(
self.azul.public_ip_addresses.delete(
self.resource_group,
pip['name']))
except azul.AzureError as e:
self.log.warning(
"Failed to cleanup IP %s (%s). Error: %r",
pip['name'], pip['id'], e
)
def _cleanupLeakedNodes(self):
deleting_nodes = {}
for node in self._zk.nodeIterator():
if node.state == zk.DELETING:
if node.provider != self.provider.name:
continue
if node.provider not in deleting_nodes:
deleting_nodes[node.provider] = []
deleting_nodes[node.provider].append(node.external_id)
for n in self.listNodes():
if n['tags'] is None:
# Nothing to check ownership against, move on
continue
if 'nodepool_provider_name' not in n['tags']:
continue
if n['tags']['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if (self.provider.name in deleting_nodes and
n['id'] in deleting_nodes[self.provider.name]):
# Already deleting this node
continue
if not self._zk.getNode(n['tags']['nodepool_id']):
self.log.warning(
"Marking for delete leaked instance %s (%s) in %s "
"(unknown node id %s)",
n['name'], n['id'], self.provider.name,
n['tags']['nodepool_id']
)
node = zk.Node()
node.external_id = n['name']
node.provider = self.provider.name
node.state = zk.DELETING
self._zk.storeNode(node)
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
self.log.debug('Server ID: %s' % server_id)
try:
vm = self.azul.virtual_machines.get(
self.resource_group, server_id)
except azul.AzureError as e:
if e.status_code == 404:
return
self.log.warning(
"Failed to cleanup node %s. Error: %r",
server_id, e
)
self.azul.wait_for_async_operation(
self.azul.virtual_machines.delete(
self.resource_group, server_id))
self.azul.wait_for_async_operation(
self.azul.network_interfaces.delete(
self.resource_group, "%s-nic" % server_id))
self.azul.wait_for_async_operation(
self.azul.public_ip_addresses.delete(
self.resource_group,
"%s-nic-pip" % server_id))
if self.provider.ipv6:
self.azul.wait_for_async_operation(
self.azul.public_ip_addresses.delete(
self.resource_group,
"%s-nic-v6-pip" % server_id))
disk_handle_list = []
for disk in self.listDisks():
if disk['tags'] is not None and \
disk['tags'].get('nodepool_id') == vm['tags']['nodepool_id']:
async_disk_delete = self.azul.disks.delete(
self.resource_group, disk['name'])
disk_handle_list.append(async_disk_delete)
for async_disk_delete in disk_handle_list:
self.azul.wait_for_async_operation(
async_disk_delete)
def waitForNodeCleanup(self, server_id):
# All async tasks are handled in cleanupNode
return True
def getInstance(self, server_id):
return self.azul.virtual_machines.get(
self.resource_group, server_id)
def createInstance(
self, hostname, label, nodepool_id, nodepool_node_label=None):
self.log.debug("Create resouce group")
tags = label.tags or {}
tags['nodepool_provider_name'] = self.provider.name
if nodepool_node_label:
tags['nodepool_node_label'] = nodepool_node_label
self.azul.resource_groups.create(
self.resource_group, {
'location': self.provider.resource_group_location,
'tags': tags
})
tags['nodepool_id'] = nodepool_id
v4_params_create = {
'location': self.provider.location,
'tags': tags,
'properties': {
'publicIpAllocationMethod': 'dynamic',
},
}
v4_public_ip = self.azul.public_ip_addresses.create(
self.resource_group,
"%s-nic-pip" % hostname,
v4_params_create,
)
nic_data = {
'location': self.provider.location,
'tags': tags,
'properties': {
'ipConfigurations': [{
'name': "nodepool-v4-ip-config",
'properties': {
'privateIpAddressVersion': 'IPv4',
'subnet': {
'id': self.provider.subnet_id
},
'publicIpAddress': {
'id': v4_public_ip['id']
}
}
}]
}
}
if self.provider.ipv6:
nic_data['properties']['ipConfigurations'].append({
'name': "zuul-v6-ip-config",
'properties': {
'privateIpAddressVersion': 'IPv6',
'subnet': {
'id': self.provider.subnet_id
}
}
})
nic = self.azul.network_interfaces.create(
self.resource_group,
"%s-nic" % hostname,
nic_data
)
vm = self.azul.virtual_machines.create(
self.resource_group, hostname, {
'location': self.provider.location,
'tags': tags,
'properties': {
'osProfile': {
'computerName': hostname,
'adminUsername': label.cloud_image.username,
'linuxConfiguration': {
'ssh': {
'publicKeys': [{
'path': "/home/%s/.ssh/authorized_keys" % (
label.cloud_image.username),
'keyData': label.cloud_image.key,
}]
},
"disablePasswordAuthentication": True,
}
},
'hardwareProfile': {
'vmSize': label.hardware_profile["vm-size"]
},
'storageProfile': {
'imageReference': label.cloud_image.image_reference
},
'networkProfile': {
'networkInterfaces': [{
'id': nic['id'],
'properties': {
'primary': True,
}
}]
},
},
})
return vm
def getIpaddress(self, instance):
# Copied from https://github.com/Azure/azure-sdk-for-python/issues/897
ni_reference = (instance['properties']['networkProfile']
['networkInterfaces'][0])
ni_reference = ni_reference['id'].split('/')
ni_group = ni_reference[4]
ni_name = ni_reference[8]
net_interface = self.azul.network_interfaces.get(
ni_group, ni_name)
ip_reference = (net_interface['properties']['ipConfigurations'][0]
['properties']['publicIPAddress'])
ip_reference = ip_reference['id'].split('/')
ip_group = ip_reference[4]
ip_name = ip_reference[8]
public_ip = self.azul.public_ip_addresses.get(
ip_group, ip_name)
public_ip = public_ip['properties']['ipAddress']
return public_ip
def getv6Ipaddress(self, instance):
# Copied from https://github.com/Azure/azure-sdk-for-python/issues/897
ni_reference = (instance['properties']['networkProfile']
['networkInterfaces'][0])
ni_reference = ni_reference['id'].split('/')
ni_group = ni_reference[4]
ni_name = ni_reference[8]
net_interface = self.azul.network_interfaces.get(
ni_group, ni_name)
return (net_interface['properties']['ipConfigurations'][1]
['properties']['privateIPAddress'])

View File

@ -1,25 +0,0 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nodepool.driver.statemachine import StateMachineDriver
from nodepool.driver.azurestate.config import AzureProviderConfig
from nodepool.driver.azurestate.adapter import AzureAdapter
class AzureDriver(StateMachineDriver):
def getProviderConfig(self, provider):
return AzureProviderConfig(self, provider)
def getAdapter(self, provider_config):
return AzureAdapter(provider_config)

View File

@ -1,397 +0,0 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import concurrent.futures
import logging
import time
import requests
class AzureAuth(requests.auth.AuthBase):
AUTH_URL = "https://login.microsoftonline.com/{tenantId}/oauth2/token"
def __init__(self, credential):
self.log = logging.getLogger("azul.auth")
self.credential = credential
self.token = None
self.expiration = time.time()
def refresh(self):
if self.expiration - time.time() < 60:
self.log.debug('Refreshing authentication token')
url = self.AUTH_URL.format(**self.credential)
data = {
'grant_type': 'client_credentials',
'client_id': self.credential['clientId'],
'client_secret': self.credential['clientSecret'],
'resource': 'https://management.azure.com/',
}
r = requests.post(url, data)
ret = r.json()
self.token = ret['access_token']
self.expiration = float(ret['expires_on'])
def __call__(self, r):
self.refresh()
r.headers["authorization"] = "Bearer " + self.token
return r
class AzureError(Exception):
def __init__(self, status_code, error_code, message):
super().__init__(message)
self.error_code = error_code
self.status_code = status_code
class AzureNotFoundError(AzureError):
pass
class AzureCRUD:
base_subscription_url = (
'https://management.azure.com/subscriptions/{subscriptionId}/')
base_url = ''
def __init__(self, cloud, **kw):
self.cloud = cloud
self.args = kw.copy()
self.args.update(self.cloud.credential)
def url(self, endpoint=None, **kw):
if endpoint is None:
endpoint = ''
else:
endpoint = '/' + endpoint
url = (self.base_subscription_url + self.base_url + endpoint
+ '?api-version={apiVersion}')
args = self.args.copy()
args.update(kw)
return url.format(**args)
def id_url(self, url, **kw):
base_url = 'https://management.azure.com'
url = base_url + url + '?api-version={apiVersion}'
args = self.args.copy()
args.update(kw)
return url.format(**args)
def get_by_id(self, resource_id):
url = self.id_url(resource_id)
return self.cloud.get(url)
def _list(self, **kw):
url = self.url(**kw)
return self.cloud.paginate(self.cloud.get(url))
def list(self):
return self._list()
def _get(self, **kw):
url = self.url(**kw)
return self.cloud.get(url)
def _create(self, params, **kw):
url = self.url(**kw)
return self.cloud.put(url, params)
def _delete(self, **kw):
url = self.url(**kw)
return self.cloud.delete(url)
def _post(self, endpoint, params, **kw):
url = self.url(endpoint=endpoint, **kw)
return self.cloud.post(url, params)
class AzureResourceGroupsCRUD(AzureCRUD):
base_url = 'resourcegroups/{resourceGroupName}'
def list(self):
return self._list(resourceGroupName='')
def get(self, name):
return self._get(resourceGroupName=name)
def create(self, name, params):
return self._create(params, resourceGroupName=name)
def delete(self, name):
return self._delete(resourceGroupName=name)
class AzureResourceProviderCRUD(AzureCRUD):
base_url = (
'/resourceGroups/{resourceGroupName}/providers/'
'{providerId}/{resource}/{resourceName}')
def list(self, resource_group_name):
return self._list(resourceGroupName=resource_group_name,
resourceName='')
def get(self, resource_group_name, name):
return self._get(resourceGroupName=resource_group_name,
resourceName=name)
def create(self, resource_group_name, name, params):
return self._create(params,
resourceGroupName=resource_group_name,
resourceName=name)
def delete(self, resource_group_name, name):
return self._delete(resourceGroupName=resource_group_name,
resourceName=name)
def post(self, resource_group_name, name, endpoint, params):
return self._post(endpoint, params,
resourceGroupName=resource_group_name,
resourceName=name)
class AzureNetworkCRUD(AzureCRUD):
base_url = (
'/resourceGroups/{resourceGroupName}/providers/'
'Microsoft.Network/virtualNetworks/{virtualNetworkName}/'
'{resource}/{resourceName}')
def list(self, resource_group_name, virtual_network_name):
return self._list(resourceGroupName=resource_group_name,
virtualNetworkName=virtual_network_name,
resourceName='')
def get(self, resource_group_name, virtual_network_name, name):
return self._get(resourceGroupName=resource_group_name,
virtualNetworkName=virtual_network_name,
resourceName=name)
def create(self, resource_group_name, virtual_network_name, name, params):
return self._create(params,
resourceGroupName=resource_group_name,
virtualNetworkName=virtual_network_name,
resourceName=name)
def delete(self, resource_group_name, virtual_network_name, name):
return self._delete(resourceGroupName=resource_group_name,
virtualNetworkName=virtual_network_name,
resourceName=name)
class AzureLocationCRUD(AzureCRUD):
base_url = (
'/providers/{providerId}/locations/{location}/{resource}')
def list(self, location):
return self._list(location=location)
class AzureProviderCRUD(AzureCRUD):
base_url = (
'/providers/{providerId}/{resource}/')
def list(self):
return self._list()
class AzureDictResponse(dict):
def __init__(self, response, *args):
super().__init__(*args)
self.response = response
self.last_retry = time.time()
class AzureListResponse(list):
def __init__(self, response, *args):
super().__init__(*args)
self.response = response
self.last_retry = time.time()
class AzureCloud:
TIMEOUT = 60
def __init__(self, credential):
self.credential = credential
self.session = requests.Session()
self.log = logging.getLogger("azul")
self.auth = AzureAuth(credential)
self.network_interfaces = AzureResourceProviderCRUD(
self,
providerId='Microsoft.Network',
resource='networkInterfaces',
apiVersion='2020-07-01')
self.public_ip_addresses = AzureResourceProviderCRUD(
self,
providerId='Microsoft.Network',
resource='publicIPAddresses',
apiVersion='2020-07-01')
self.virtual_machines = AzureResourceProviderCRUD(
self,
providerId='Microsoft.Compute',
resource='virtualMachines',
apiVersion='2020-12-01')
self.disks = AzureResourceProviderCRUD(
self,
providerId='Microsoft.Compute',
resource='disks',
apiVersion='2020-06-30')
self.images = AzureResourceProviderCRUD(
self,
providerId='Microsoft.Compute',
resource='images',
apiVersion='2020-12-01')
self.resource_groups = AzureResourceGroupsCRUD(
self,
apiVersion='2020-06-01')
self.subnets = AzureNetworkCRUD(
self,
resource='subnets',
apiVersion='2020-07-01')
self.compute_usages = AzureLocationCRUD(
self,
providerId='Microsoft.Compute',
resource='usages',
apiVersion='2020-12-01')
self.compute_skus = AzureProviderCRUD(
self,
providerId='Microsoft.Compute',
resource='skus',
apiVersion='2019-04-01')
def get(self, url, codes=[200]):
return self.request('GET', url, None, codes)
def put(self, url, data, codes=[200, 201, 202]):
return self.request('PUT', url, data, codes)
def post(self, url, data, codes=[200, 202]):
return self.request('POST', url, data, codes)
def delete(self, url, codes=[200, 201, 202, 204]):
return self.request('DELETE', url, None, codes)
def request(self, method, url, data, codes):
self.log.debug('%s: %s %s' % (method, url, data))
response = self.session.request(
method, url, json=data,
auth=self.auth, timeout=self.TIMEOUT,
headers={'Accept': 'application/json',
'Accept-Encoding': 'gzip'})
self.log.debug("Received headers: %s", response.headers)
if response.status_code in codes:
if len(response.text):
self.log.debug("Received: %s", response.text)
ret_data = response.json()
if isinstance(ret_data, list):
return AzureListResponse(response, ret_data)
else:
return AzureDictResponse(response, ret_data)
self.log.debug("Empty response")
return AzureDictResponse(response, {})
err = response.json()
self.log.error(response.text)
if response.status_code == 404:
raise AzureNotFoundError(
response.status_code,
err['error']['code'],
err['error']['message'])
else:
raise AzureError(response.status_code,
err['error']['code'],
err['error']['message'])
def paginate(self, data):
ret = data['value']
while 'nextLink' in data:
data = self.get(data['nextLink'])
ret += data['value']
return ret
def check_async_operation(self, response):
resp = response.response
location = resp.headers.get(
'Azure-AsyncOperation',
resp.headers.get('Location', None))
if not location:
self.log.debug("No async operation found")
return None
remain = (response.last_retry +
float(resp.headers.get('Retry-After', 2))) - time.time()
self.log.debug("remain time %s", remain)
if remain > 0:
time.sleep(remain)
response.last_retry = time.time()
return self.get(location)
def wait_for_async_operation(self, response, timeout=600):
start = time.time()
while True:
if time.time() - start > timeout:
raise Exception("Timeout waiting for async operation")
ret = self.check_async_operation(response)
if ret is None:
return
if ret['status'] == 'InProgress':
continue
if ret['status'] == 'Succeeded':
return ret
raise Exception("Unhandled async operation result: %s",
ret['status'])
def _upload_chunk(self, url, start, end, data):
headers = {
'x-ms-blob-type': 'PageBlob',
'x-ms-page-write': 'Update',
'Content-Length': str(len(data)),
'Range': f'bytes={start}-{end}',
}
attempts = 10
for x in range(attempts):
try:
requests.put(url, headers=headers, data=data).\
raise_for_status()
break
except Exception:
if x == attempts - 1:
raise
else:
time.sleep(2 * x)
def upload_page_blob_to_sas_url(self, url, file_object,
pagesize=(4 * 1024 * 1024),
concurrency=10):
start = 0
futures = set()
if 'comp=page' not in url:
url += '&comp=page'
with concurrent.futures.ThreadPoolExecutor(
max_workers=concurrency) as executor:
while True:
chunk = file_object.read(pagesize)
if not chunk:
break
end = start + len(chunk) - 1
future = executor.submit(self._upload_chunk, url,
start, end, chunk)
start += len(chunk)
futures.add(future)
# Keep the pool of work supplied with data but without
# reading the entire file into memory.
if len(futures) >= (concurrency * 2):
(done, futures) = concurrent.futures.wait(
futures,
return_when=concurrent.futures.FIRST_COMPLETED)
# We're done reading the file, wait for all uploads to finish
(done, futures) = concurrent.futures.wait(futures)

View File

@ -1,303 +0,0 @@
# Copyright 2018 Red Hat
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import voluptuous as v
import os
from nodepool.driver import ConfigPool
from nodepool.driver import ConfigValue
from nodepool.driver import ProviderConfig
class AzureProviderCloudImage(ConfigValue):
def __init__(self, image, zuul_public_key):
default_port_mapping = {
'ssh': 22,
'winrm': 5986,
}
self.name = image['name']
self.username = image['username']
# TODO(corvus): remove zuul_public_key
self.key = image.get('key', zuul_public_key)
self.image_reference = image['image-reference']
self.python_path = image.get('python-path')
self.connection_type = image.get('connection-type', 'ssh')
self.connection_port = image.get(
'connection-port',
default_port_mapping.get(self.connection_type, 22))
@property
def external_name(self):
'''Human readable version of external.'''
return self.image_id or self.name
@staticmethod
def getSchema():
azure_image_reference = {
v.Required('sku'): str,
v.Required('publisher'): str,
v.Required('version'): str,
v.Required('offer'): str,
}
return {
v.Required('name'): str,
v.Required('username'): str,
# TODO(corvus): make required when zuul_public_key removed
'key': str,
v.Required('image-reference'): azure_image_reference,
'connection-type': str,
'connection-port': int,
'python-path': str,
# TODO(corvus): shell-type
}
class AzureProviderDiskImage(ConfigValue):
def __init__(self, image, diskimage):
default_port_mapping = {
'ssh': 22,
'winrm': 5986,
}
self.name = image['name']
diskimage.image_types.add('vhd')
self.pause = bool(image.get('pause', False))
self.python_path = image.get('python-path')
self.username = image.get('username')
self.key = image.get('key')
self.connection_type = image.get('connection-type', 'ssh')
self.connection_port = image.get(
'connection-port',
default_port_mapping.get(self.connection_type, 22))
self.meta = {}
@property
def external_name(self):
'''Human readable version of external.'''
return self.name
@staticmethod
def getSchema():
return {
v.Required('name'): str,
'username': str,
'key': str,
'pause': bool,
'connection-type': str,
'connection-port': int,
'python-path': str,
# TODO(corvus): shell-type
}
class AzureLabel(ConfigValue):
ignore_equality = ['pool']
def __init__(self, label, provider_config, provider_pool):
self.hardware_profile = None
self.name = label['name']
self.pool = provider_pool
cloud_image_name = label.get('cloud-image')
if cloud_image_name:
cloud_image = provider_config.cloud_images.get(
cloud_image_name, None)
if not cloud_image:
raise ValueError(
"cloud-image %s does not exist in provider %s"
" but is referenced in label %s" %
(cloud_image_name, provider_config.name, self.name))
self.cloud_image = cloud_image
else:
self.cloud_image = None
diskimage_name = label.get('diskimage')
if diskimage_name:
diskimage = provider_config.diskimages.get(
diskimage_name, None)
if not diskimage:
raise ValueError(
"diskimage %s does not exist in provider %s"
" but is referenced in label %s" %
(diskimage_name, provider_config.name, self.name))
self.diskimage = diskimage
else:
self.diskimage = None
self.hardware_profile = label['hardware-profile']
self.tags = label.get('tags', {})
@staticmethod
def getSchema():
azure_hardware_profile = {
v.Required('vm-size'): str,
}
return {
v.Required('name'): str,
'cloud-image': str,
'diskimage': str,
v.Required('hardware-profile'): azure_hardware_profile,
'tags': dict,
}
class AzurePool(ConfigPool):
ignore_equality = ['provider']
def __init__(self, provider_config, pool_config):
super().__init__()
self.provider = provider_config
self.load(pool_config)
def load(self, pool_config):
self.name = pool_config['name']
self.max_servers = pool_config['max-servers']
self.public_ipv4 = pool_config.get('public-ipv4',
self.provider.public_ipv4)
self.public_ipv6 = pool_config.get('public-ipv6',
self.provider.public_ipv6)
self.ipv4 = pool_config.get('ipv4', self.provider.ipv4)
self.ipv6 = pool_config.get('ipv6', self.provider.ipv6)
self.ipv4 = self.ipv4 or self.public_ipv4
self.ipv6 = self.ipv6 or self.public_ipv6
if not self.ipv4 or self.ipv6:
self.ipv4 = True
self.use_internal_ip = pool_config.get(
'use-internal-ip', self.provider.use_internal_ip)
self.host_key_checking = pool_config.get(
'host-key-checking', self.provider.use_internal_ip)
@staticmethod
def getSchema():
azure_label = AzureLabel.getSchema()
pool = ConfigPool.getCommonSchemaDict()
pool.update({
v.Required('name'): str,
v.Required('labels'): [azure_label],
'ipv4': bool,
'ipv6': bool,
'public-ipv4': bool,
'public-ipv6': bool,
'use-internal-ip': bool,
'host-key-checking': bool,
})
return pool
class AzureProviderConfig(ProviderConfig):
def __init__(self, driver, provider):
super().__init__(provider)
self._pools = {}
self.rate_limit = None
self.launch_retries = None
@property
def pools(self):
return self._pools
@property
def manage_images(self):
return True
@staticmethod
def reset():
pass
def load(self, config):
self.image_type = 'vhd'
self.image_name_format = '{image_name}-{timestamp}'
self.post_upload_hook = self.provider.get('post-upload-hook')
self.rate_limit = self.provider.get('rate-limit', 1)
self.launch_retries = self.provider.get('launch-retries', 3)
self.boot_timeout = self.provider.get('boot-timeout', 60)
# TODO(corvus): remove
self.zuul_public_key = self.provider.get('zuul-public-key')
self.location = self.provider['location']
self.subnet_id = self.provider.get('subnet-id')
self.network = self.provider.get('network')
# Don't use these directly; these are default values for
# labels.
self.public_ipv4 = self.provider.get('public-ipv4', False)
self.public_ipv6 = self.provider.get('public-ipv6', False)
self.ipv4 = self.provider.get('ipv4', None)
self.ipv6 = self.provider.get('ipv6', None)
self.use_internal_ip = self.provider.get('use-internal-ip', False)
self.host_key_checking = self.provider.get('host-key-checking', True)
self.resource_group = self.provider['resource-group']
self.resource_group_location = self.provider['resource-group-location']
self.auth_path = self.provider.get(
'auth-path', os.getenv('AZURE_AUTH_LOCATION', None))
self.cloud_images = {}
for image in self.provider['cloud-images']:
i = AzureProviderCloudImage(image, self.zuul_public_key)
self.cloud_images[i.name] = i
self.diskimages = {}
for image in self.provider['diskimages']:
diskimage = config.diskimages[image['name']]
i = AzureProviderDiskImage(image, diskimage)
self.diskimages[i.name] = i
for pool in self.provider.get('pools', []):
pp = AzurePool(self, pool)
self._pools[pp.name] = pp
for label in pool.get('labels', []):
pl = AzureLabel(label, self, pp)
pp.labels[pl.name] = pl
config.labels[pl.name].pools.append(pp)
def getSchema(self):
provider_cloud_images = AzureProviderCloudImage.getSchema()
pool = AzurePool.getSchema()
provider = ProviderConfig.getCommonSchemaDict()
provider.update({
v.Required('pools'): [pool],
v.Required('location'): str,
v.Required('resource-group'): str,
v.Required('resource-group-location'): str,
'subnet-id': str,
'network': v.Any(str, {
'resource-group': str,
'network': str,
'subnet': str,
}),
v.Required('cloud-images'): [provider_cloud_images],
v.Required('auth-path'): str,
'ipv4': bool,
'ipv6': bool,
'public-ipv4': bool,
'public-ipv6': bool,
'use-internal-ip': bool,
'host-key-checking': bool,
})
return v.Schema(provider)
def getSupportedLabels(self, pool_name=None):
labels = set()
for pool in self._pools.values():
if not pool_name or (pool.name == pool_name):
labels.update(pool.labels.keys())
return labels

View File

@ -433,6 +433,8 @@ class StateMachineProvider(Provider, QuotaSupport):
framework"""
log = logging.getLogger("nodepool.driver.statemachine."
"StateMachineProvider")
MINIMUM_SLEEP = 1
MAXIMUM_SLEEP = 10
def __init__(self, adapter, provider):
super().__init__()
@ -495,9 +497,10 @@ class StateMachineProvider(Provider, QuotaSupport):
self.launchers.remove(sm)
loop_end = time.monotonic()
if self.launchers or self.deleters:
time.sleep(max(0, 10 - (loop_end - loop_start)))
time.sleep(max(0, self.MAXIMUM_SLEEP -
(loop_end - loop_start)))
else:
time.sleep(1)
time.sleep(self.MINIMUM_SLEEP)
def getRequestHandler(self, poolworker, request):
return StateMachineHandler(poolworker, request)

View File

@ -0,0 +1,59 @@
elements-dir: .
images-dir: '{images_dir}'
build-log-dir: '{build_log_dir}'
build-log-retention: 1
webapp:
port: 8005
listen_address: '0.0.0.0'
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
labels:
- name: bionic
min-ready: 0
providers:
- name: azure
driver: azure
zuul-public-key: ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC+mplenM+m6pNY9Un3fpO9eqf808Jrfb3d1gXg7BZVawCvtEZ/cDYvLQ3OF1AeL2kcIC0UAIglM5JXae7yO5CJbJRdkbXvv0u1LvpLxYSPM4ATR0r4IseC5YVxkfJQNi4ixSwTqD4ScEkuCXcSqSU9M+hB+KlnwXoR4IcYHf7vD2Z0Mdwm2ikk3SeERmspmMxx/uz0SPn58QxONuoTlNWQKqDWsV6bRyoPa6HWccMrIH1/e7E69Nw/30oioOQpKBgaDCauh+QkDtSkjRpRMOV47ZFh16Q9DqMgLx+FD8z6++9rsHlB65Zas1xyQsiRCFG09s00b7OR7Xz9ukQ5+vXV
resource-group-location: centralus
location: centralus
resource-group: nodepool
auth-path: {auth_path}
subnet-id: /subscriptions/c35cf7df-ed75-4c85-be00-535409a85120/resourceGroups/nodepool/providers/Microsoft.Network/virtualNetworks/NodePool/subnets/default
diskimages:
- name: fake-image
pools:
- name: main
max-servers: 10
labels:
- name: bionic
diskimage: fake-image
hardware-profile:
vm-size: Standard_B1ls
tags:
department: R&D
team: DevOps
systemPurpose: CI
diskimages:
- name: fake-image
elements:
- fedora
- vm
release: 21
dib-cmd: nodepool/tests/fake-image-create
env-vars:
TMPDIR: /opt/dib_tmp
DIB_IMAGE_CACHE: /opt/dib_cache
DIB_CLOUD_IMAGES: http://download.fedoraproject.org/pub/fedora/linux/releases/test/21-Beta/Cloud/Images/x86_64/
BASE_IMAGE_FILE: Fedora-Cloud-Base-20141029-21_Beta.x86_64.qcow2

View File

@ -14,7 +14,7 @@ zookeeper-tls:
labels:
- name: bionic
min-ready: 1
min-ready: 0
providers:
- name: azure
@ -40,7 +40,7 @@ providers:
- name: bionic
cloud-image: bionic
hardware-profile:
vm-size: Standard_D1_v2
vm-size: Standard_B1ls
tags:
department: R&D
team: DevOps

View File

@ -24,6 +24,7 @@ import uuid
import fixtures
import responses
import requests
class CRUDManager:
@ -42,7 +43,11 @@ class CRUDManager:
for item in self.items:
if item['id'] == url.path:
return (200, {}, json.dumps(item))
return (404, {}, json.dumps({'error': {'message': 'Not Found'}}))
return (404, {}, json.dumps({
'error': {
'message': 'Not Found',
'code': 'NotFound',
}}))
class ResourceGroupsCRUD(CRUDManager):
@ -77,7 +82,7 @@ class PublicIPAddressesCRUD(CRUDManager):
"publicIPAddressVersion": "IPv4",
"publicIPAllocationMethod": "Dynamic",
"idleTimeoutInMinutes": 4,
"ipTags": []
"ipTags": [],
}
self.items.append(data)
ret = json.dumps(data)
@ -189,7 +194,65 @@ class DisksCRUD(CRUDManager):
"provisioningState": "Succeeded",
}
self.items.append(data)
return (200, {}, json.dumps(data))
async_url = 'https://management.azure.com/async' + request.path_url
headers = {'Azure-AsyncOperation': async_url,
'Retry-After': '0'}
return (200, headers, json.dumps(data))
def post(self, request):
data = json.loads(request.body)
url = urllib.parse.urlparse(request.path_url)
name = url.path.split('/')[-2]
action = url.path.split('/')[-1]
if action == 'beginGetAccess':
async_url = 'https://management.azure.com/async' + request.path_url
async_url = async_url.replace('/beginGetAccess', '')
for item in self.items:
if item['name'] == name:
item['accessSAS'] = (
'https://management.azure.com/sas/' + name)
if action == 'endGetAccess':
async_url = 'https://management.azure.com/async' + request.path_url
async_url = async_url.replace('/endGetAccess', '')
headers = {'Azure-AsyncOperation': async_url,
'Retry-After': '0'}
return (200, headers, json.dumps(data))
def delete(self, request):
url = urllib.parse.urlparse(request.path_url)
name = url.path.split('/')[-1]
async_url = 'https://management.azure.com/async' + request.path_url
for item in self.items:
if item['name'] == name:
self.items.remove(item)
headers = {'Azure-AsyncOperation': async_url,
'Retry-After': '0'}
return (200, headers, '')
return (404, {}, json.dumps({
'error': {
'message': 'Not Found',
'code': 'NotFound',
}}))
class ImagesCRUD(CRUDManager):
name = "Microsoft.Compute/images"
def put(self, request):
data = json.loads(request.body)
url = urllib.parse.urlparse(request.path_url)
name = url.path.split('/')[-1]
data['id'] = url.path
data['name'] = name
data['type'] = self.name
data['properties'] = {
"provisioningState": "Succeeded",
}
self.items.append(data)
async_url = 'https://management.azure.com/async' + request.path_url
headers = {'Azure-AsyncOperation': async_url,
'Retry-After': '0'}
return (200, headers, json.dumps(data))
class FakeAzureFixture(fixtures.Fixture):
@ -227,6 +290,34 @@ class FakeAzureFixture(fixtures.Fixture):
'expires_on': time.time() + 600,
})
self.responses.add_callback(
responses.GET,
('https://management.azure.com/subscriptions/'
f'{self.subscription_id}/providers/Microsoft.Compute/skus/'
'?api-version=2019-04-01'),
callback=self._get_compute_skus,
content_type='application/json')
self.responses.add_callback(
responses.GET,
('https://management.azure.com/subscriptions/'
f'{self.subscription_id}/providers/Microsoft.Compute/locations/'
'centralus/usages?api-version=2020-12-01'),
callback=self._get_compute_usages,
content_type='application/json')
async_re = re.compile('https://management.azure.com/async/(.*)')
self.responses.add_callback(
responses.GET, async_re,
callback=self._get_async,
content_type='application/json')
sas_re = re.compile('https://management.azure.com/sas/(.*)')
self.responses.add_callback(
responses.PUT, sas_re,
callback=self._put_sas,
content_type='application/json')
self._setup_crud(ResourceGroupsCRUD, '2020-06-01',
resource_grouped=False)
@ -234,6 +325,7 @@ class FakeAzureFixture(fixtures.Fixture):
self._setup_crud(NetworkInterfacesCRUD, '2020-07-01')
self._setup_crud(PublicIPAddressesCRUD, '2020-07-01')
self._setup_crud(DisksCRUD, '2020-06-30')
self._setup_crud(ImagesCRUD, '2020-12-01')
self.addCleanup(self.responses.stop)
self.addCleanup(self.responses.reset)
@ -249,11 +341,11 @@ class FakeAzureFixture(fixtures.Fixture):
list_re = re.compile(
'https://management.azure.com/subscriptions/'
+ f'{self.subscription_id}/'
+ rg + f'{manager.name}?\\?api-version={api_version}')
+ rg + f'{manager.name}/?\\?api-version={api_version}')
crud_re = re.compile(
'https://management.azure.com/subscriptions/'
+ f'{self.subscription_id}/'
+ rg + f'{manager.name}/(.*?)?\\?api-version={api_version}')
+ rg + f'{manager.name}/(.+?)\\?api-version={api_version}')
self.responses.add_callback(
responses.GET, list_re, callback=self.crud[manager.name].list,
content_type='application/json')
@ -263,8 +355,100 @@ class FakeAzureFixture(fixtures.Fixture):
self.responses.add_callback(
responses.PUT, crud_re, callback=self.crud[manager.name].put,
content_type='application/json')
if hasattr(self.crud[manager.name], 'post'):
self.responses.add_callback(
responses.POST, crud_re, callback=self.crud[manager.name].post,
content_type='application/json')
if hasattr(self.crud[manager.name], 'delete'):
self.responses.add_callback(
responses.DELETE, crud_re,
callback=self.crud[manager.name].delete,
content_type='application/json')
def _extract_resource_group(self, path):
url = re.compile('/subscriptions/(.*?)/resourceGroups/(.*?)/')
m = url.match(path)
return m.group(2)
def _get_compute_skus(self, request):
data = {
'value': [
{'capabilities': [
{'name': 'MaxResourceVolumeMB', 'value': '4096'},
{'name': 'OSVhdSizeMB', 'value': '1047552'},
{'name': 'vCPUs', 'value': '1'},
{'name': 'HyperVGenerations', 'value': 'V1,V2'},
{'name': 'MemoryGB', 'value': '0.5'},
{'name': 'MaxDataDiskCount', 'value': '2'},
{'name': 'LowPriorityCapable', 'value': 'False'},
{'name': 'PremiumIO', 'value': 'True'},
{'name': 'VMDeploymentTypes', 'value': 'IaaS'},
{'name': 'CombinedTempDiskAndCachedIOPS', 'value': '200'},
{'name': 'CombinedTempDiskAndCachedReadBytesPerSecond',
'value': '10485760'},
{'name': 'CombinedTempDiskAndCachedWriteBytesPerSecond',
'value': '10485760'},
{'name': 'UncachedDiskIOPS', 'value': '160'},
{'name': 'UncachedDiskBytesPerSecond',
'value': '10485760'},
{'name': 'EphemeralOSDiskSupported', 'value': 'True'},
{'name': 'EncryptionAtHostSupported', 'value': 'True'},
{'name': 'AcceleratedNetworkingEnabled', 'value': 'False'},
{'name': 'RdmaEnabled', 'value': 'False'},
{'name': 'MaxNetworkInterfaces', 'value': '2'}],
'family': 'standardBSFamily',
'locationInfo': [
{'location': 'centralus',
'zoneDetails': [
{'Name': ['3', '2', '1'],
'capabilities': [
{'name': 'UltraSSDAvailable',
'value': 'True'}]}],
'zones': ['3', '1', '2']}],
'locations': ['centralus'],
'name': 'Standard_B1ls',
'resourceType': 'virtualMachines',
'restrictions': [],
'size': 'B1ls',
'tier': 'Standard'}
]
}
return (200, {}, json.dumps(data))
def _get_compute_usages(self, request):
mgr = self.crud["Microsoft.Compute/virtualMachines"]
data = {
'value': [
{
"limit": 4,
"unit": "Count",
"currentValue": len(mgr.items),
"name": {
"value": "cores",
"localizedValue": "Total Regional vCPUs"
}
}, {
"limit": 25000,
"unit": "Count",
"currentValue": len(mgr.items),
"name": {
"value": "virtualMachines",
"localizedValue": "Virtual Machines"
}
}
]}
return (200, {}, json.dumps(data))
def _get_async(self, request):
path = request.path_url[len('/async'):]
ret = requests.get('https://management.azure.com' + path)
data = {
'status': 'Succeeded',
'properties': {
'output': ret.json(),
}
}
return (200, {}, json.dumps(data))
def _put_sas(self, request):
return (201, {}, '')

View File

@ -18,6 +18,7 @@ import logging
from nodepool import tests
from nodepool import zk
from nodepool.driver.statemachine import StateMachineProvider
from . import fake_azure
@ -28,10 +29,12 @@ class TestDriverAzure(tests.DBTestCase):
def setUp(self):
super().setUp()
StateMachineProvider.MINIMUM_SLEEP = 0.1
StateMachineProvider.MAXIMUM_SLEEP = 1
self.fake_azure = fake_azure.FakeAzureFixture()
self.useFixture(self.fake_azure)
def test_azure_machine(self):
def test_azure_cloud_image(self):
configfile = self.setup_config(
'azure.yaml',
auth_path=self.fake_azure.auth_file.name)
@ -51,3 +54,32 @@ class TestDriverAzure(tests.DBTestCase):
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
def test_azure_diskimage(self):
configfile = self.setup_config(
'azure-diskimage.yaml',
auth_path=self.fake_azure.auth_file.name)
self.useBuilder(configfile)
image = self.waitForImage('azure', 'fake-image')
self.assertEqual(image.username, 'zuul')
configfile = self.setup_config(
'azure-diskimage.yaml',
auth_path=self.fake_azure.auth_file.name)
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('bionic')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')