Azure: switch to Azul

The Azure SDK for Python uses threads to manage async operations.
Every time a virtual machine is created, a new thread is spawned
to wait for it to finish (whether we actually end up polling it or
not).  This will cause the Azure driver to have significant
scalability limits compared to other drivers, possibly limiting
the number of simultaneous nodes to 50% compared to others.

To address this, switch to using a very simple requests-based
REST client I'm calling Azul.  The consistency of the Azure API
makes this simple.  As a bonus, we can use the excellent Azure
REST API documentation directly, rather that mapping attribute
names through the Python SDK (which has subtle differences).

A new fake Azure test fixture is also created in order to make
the current unit test a more thorough exercise of the code.

Finally, the "zuul-private-key" attribute is misnamed since we
have a policy of a one-way dependency from Zuul -> Nodepool.  It's
name is updated to match the GCE driver ("key") and moved to the
cloud-image section so that different images may be given different
keys.

Change-Id: I87bfa65733b2a71b294ebe2cf0d3404d0e4333c5
This commit is contained in:
James E. Blair 2021-03-04 18:59:30 -08:00
parent 6efac6cea2
commit 91804a5e16
11 changed files with 815 additions and 386 deletions

View File

@ -34,15 +34,15 @@ section of the configuration.
providers:
- name: azure-central-us
driver: azure
zuul-public-key: ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAA...
resource-group-location: centralus
location: centralus
resource-group: ZuulCIDev
resource-group: nodepool
auth-path: /Users/grhayes/.azure/nodepoolCreds.json
subnet-id: /subscriptions/<subscription-id>/resourceGroups/ZuulCI/providers/Microsoft.Network/virtualNetworks/NodePool/subnets/default
subnet-id: /subscriptions/<subscription-id>/resourceGroups/nodepool/providers/Microsoft.Network/virtualNetworks/NodePool/subnets/default
cloud-images:
- name: bionic
username: zuul
key: ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAA...
image-reference:
sku: 18.04-LTS
publisher: Canonical
@ -70,10 +70,16 @@ section of the configuration.
Name of the Azure region to interact with.
.. attr:: resource-group
:required:
Name of the Resource Group in which to place the Nodepool nodes.
.. attr:: resource-group-location
:required:
Name of the Azure region to where the home Resource Group is or should be created.
Name of the Azure region where the home Resource Group is or
should be created.
.. attr:: auth-path
:required:
@ -128,6 +134,11 @@ section of the configuration.
The username that a consumer should use when connecting to the
node.
.. attr:: key
:type: str
The SSH public key that should be installed on the node.
.. attr:: image-reference
:type: dict
:required:

View File

@ -0,0 +1,269 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import requests
import logging
import time
class AzureAuth(requests.auth.AuthBase):
AUTH_URL = "https://login.microsoftonline.com/{tenantId}/oauth2/token"
def __init__(self, credential):
self.log = logging.getLogger("azul.auth")
self.credential = credential
self.token = None
self.expiration = time.time()
def refresh(self):
if self.expiration - time.time() < 60:
self.log.debug('Refreshing authentication token')
url = self.AUTH_URL.format(**self.credential)
data = {
'grant_type': 'client_credentials',
'client_id': self.credential['clientId'],
'client_secret': self.credential['clientSecret'],
'resource': 'https://management.azure.com/',
}
r = requests.post(url, data)
ret = r.json()
self.token = ret['access_token']
self.expiration = float(ret['expires_on'])
def __call__(self, r):
self.refresh()
r.headers["authorization"] = "Bearer " + self.token
return r
class AzureError(Exception):
def __init__(self, status_code, message):
super().__init__(message)
self.status_code = status_code
class AzureNotFoundError(AzureError):
def __init__(self, status_code, message):
super().__init__(status_code, message)
class AzureResourceGroupsCRUD:
def __init__(self, cloud, version):
self.cloud = cloud
self.version = version
def url(self, url, **args):
base_url = (
'https://management.azure.com/subscriptions/{subscriptionId}'
'/resourcegroups/')
url = base_url + url + '?api-version={apiVersion}'
args = args.copy()
args.update(self.cloud.credential)
args['apiVersion'] = self.version
return url.format(**args)
def list(self):
url = self.url('')
return self.cloud.paginate(self.cloud.get(url))
def get(self, name):
url = self.url(name)
return self.cloud.get(url)
def create(self, name, params):
url = self.url(name)
return self.cloud.put(url, params)
def delete(self, name):
url = self.url(name)
return self.cloud.delete(url)
class AzureCRUD:
def __init__(self, cloud, resource, version):
self.cloud = cloud
self.resource = resource
self.version = version
def url(self, url, **args):
base_url = (
'https://management.azure.com/subscriptions/{subscriptionId}'
'/resourceGroups/{resourceGroupName}/providers/')
url = base_url + url + '?api-version={apiVersion}'
args = args.copy()
args.update(self.cloud.credential)
args['apiVersion'] = self.version
return url.format(**args)
def id_url(self, url, **args):
base_url = 'https://management.azure.com'
url = base_url + url + '?api-version={apiVersion}'
args = args.copy()
args['apiVersion'] = self.version
return url.format(**args)
def list(self, resource_group_name):
url = self.url(
self.resource,
resourceGroupName=resource_group_name,
)
return self.cloud.paginate(self.cloud.get(url))
def get_by_id(self, resource_id):
url = self.id_url(resource_id)
return self.cloud.get(url)
def get(self, resource_group_name, name):
url = self.url(
'{_resource}/{_resourceName}',
_resource=self.resource,
_resourceName=name,
resourceGroupName=resource_group_name,
)
return self.cloud.get(url)
def create(self, resource_group_name, name, params):
url = self.url(
'{_resource}/{_resourceName}',
_resource=self.resource,
_resourceName=name,
resourceGroupName=resource_group_name,
)
return self.cloud.put(url, params)
def delete(self, resource_group_name, name):
url = self.url(
'{_resource}/{_resourceName}',
_resource=self.resource,
_resourceName=name,
resourceGroupName=resource_group_name,
)
return self.cloud.delete(url)
class AzureDictResponse(dict):
def __init__(self, response, *args):
super().__init__(*args)
self.response = response
self.last_retry = time.time()
class AzureListResponse(list):
def __init__(self, response, *args):
super().__init__(*args)
self.response = response
self.last_retry = time.time()
class AzureCloud:
TIMEOUT = 60
def __init__(self, credential):
self.credential = credential
self.session = requests.Session()
self.log = logging.getLogger("azul")
self.auth = AzureAuth(credential)
self.network_interfaces = AzureCRUD(
self,
'Microsoft.Network/networkInterfaces',
'2020-07-01')
self.public_ip_addresses = AzureCRUD(
self,
'Microsoft.Network/publicIPAddresses',
'2020-07-01')
self.virtual_machines = AzureCRUD(
self,
'Microsoft.Compute/virtualMachines',
'2020-12-01')
self.disks = AzureCRUD(
self,
'Microsoft.Compute/disks',
'2020-06-30')
self.resource_groups = AzureResourceGroupsCRUD(
self,
'2020-06-01')
def get(self, url, codes=[200]):
return self.request('GET', url, None, codes)
def put(self, url, data, codes=[200, 201]):
return self.request('PUT', url, data, codes)
def delete(self, url, codes=[200, 201, 202, 204]):
return self.request('DELETE', url, None, codes)
def request(self, method, url, data, codes):
self.log.debug('%s: %s %s' % (method, url, data))
response = self.session.request(
method, url, json=data,
auth=self.auth, timeout=self.TIMEOUT,
headers={'Accept': 'application/json',
'Accept-Encoding': 'gzip'})
self.log.debug("Received headers: %s", response.headers)
if response.status_code in codes:
if len(response.text):
self.log.debug("Received: %s", response.text)
ret_data = response.json()
if isinstance(ret_data, list):
return AzureListResponse(response, ret_data)
else:
return AzureDictResponse(response, ret_data)
self.log.debug("Empty response")
return AzureDictResponse(response, {})
err = response.json()
self.log.error(response.text)
if response.status_code == 404:
raise AzureNotFoundError(
response.status_code, err['error']['message'])
else:
raise AzureError(response.status_code, err['error']['message'])
def paginate(self, data):
ret = data['value']
while 'nextLink' in data:
data = self.get(data['nextLink'])
ret += data['value']
return ret
def check_async_operation(self, response):
resp = response.response
location = resp.headers.get(
'Azure-AsyncOperation',
resp.headers.get('Location', None))
if not location:
self.log.debug("No async operation found")
return None
remain = (response.last_retry +
float(resp.headers.get('Retry-After', 2))) - time.time()
self.log.debug("remain time %s", remain)
if remain > 0:
time.sleep(remain)
response.last_retry = time.time()
return self.get(location)
def wait_for_async_operation(self, response, timeout=600):
start = time.time()
while True:
if time.time() - start > timeout:
raise Exception("Timeout waiting for async operation")
ret = self.check_async_operation(response)
if ret is None:
return
if ret['status'] == 'InProgress':
continue
if ret['status'] == 'Succeeded':
return
raise Exception("Unhandled async operation result: %s",
ret['status'])

View File

@ -22,11 +22,40 @@ from nodepool.driver import ConfigValue
from nodepool.driver import ProviderConfig
class AzureProviderCloudImage(ConfigValue):
def __init__(self):
self.name = None
self.image_id = None
self.username = None
self.key = None
self.python_path = None
self.connection_type = None
self.connection_port = None
def __eq__(self, other):
if isinstance(other, AzureProviderCloudImage):
return (self.name == other.name
and self.image_id == other.image_id
and self.username == other.username
and self.key == other.key
and self.python_path == other.python_path
and self.connection_type == other.connection_type
and self.connection_port == other.connection_port)
return False
def __repr__(self):
return "<AzureProviderCloudImage %s>" % self.name
@property
def external_name(self):
'''Human readable version of external.'''
return self.image_id or self.name
class AzureLabel(ConfigValue):
def __eq__(self, other):
if (other.username != self.username or
other.imageReference != self.imageReference or
other.hardwareProfile != self.hardwareProfile):
if (other.cloud_image != self.cloud_image or
other.hardware_profile != self.hardware_profile):
return False
return True
@ -69,6 +98,10 @@ class AzureProviderConfig(ProviderConfig):
pass
def load(self, config):
default_port_mapping = {
'ssh': 22,
'winrm': 5986,
}
self.zuul_public_key = self.provider['zuul-public-key']
self.location = self.provider['location']
@ -81,7 +114,16 @@ class AzureProviderConfig(ProviderConfig):
self.cloud_images = {}
for image in self.provider['cloud-images']:
self.cloud_images[image['name']] = image
i = AzureProviderCloudImage()
i.name = image['name']
i.username = image['username']
i.key = image.get('key', self.zuul_public_key)
i.image_reference = image['image-reference']
i.connection_type = image.get('connection-type', 'ssh')
i.connection_port = image.get(
'connection-port',
default_port_mapping.get(i.connection_type, 22))
self.cloud_images[i.name] = i
for pool in self.provider.get('pools', []):
pp = AzurePool()
@ -106,13 +148,11 @@ class AzureProviderConfig(ProviderConfig):
"cloud-image %s does not exist in provider %s"
" but is referenced in label %s" %
(cloud_image_name, self.name, pl.name))
pl.imageReference = cloud_image['image-reference']
pl.username = cloud_image.get('username', 'zuul')
pl.cloud_image = cloud_image
else:
pl.imageReference = None
pl.username = 'zuul'
pl.cloud_image = None
pl.hardwareProfile = label['hardware-profile']
pl.hardware_profile = label['hardware-profile']
config.labels[label['name']].pools.append(pp)
pl.tags = label['tags']

View File

@ -51,41 +51,40 @@ class AzureInstanceLauncher(NodeLauncher):
self.log.exception(
"Launch attempt %d/%d failed for node %s:",
attempts, self.retries, self.node.id)
else:
if attempts == self.retries:
raise
attempts += 1
time.sleep(1)
self.node.external_id = instance.id
self.node.external_id = hostname
boot_start = time.monotonic()
while time.monotonic() - boot_start < self.boot_timeout:
state = instance.provisioning_state
self.log.debug("Instance %s is %s" % (instance.id, state))
state = instance['properties']['provisioningState']
self.log.debug("Instance %s is %s" % (hostname, state))
if state == 'Succeeded':
break
time.sleep(0.5)
instance = self.handler.manager.getInstance(instance.id)
instance = self.handler.manager.getInstance(hostname)
if state != 'Succeeded':
raise exceptions.LaunchStatusException(
"Instance %s failed to start: %s" % (instance.id, state))
"Instance %s failed to start: %s" % (hostname, state))
server_ip = self.handler.manager.getIpaddress(instance)
if self.provider_config.ipv6:
server_v6_ip = self.handler.manager.getv6Ipaddress(instance)
if not server_ip:
raise exceptions.LaunchStatusException(
"Instance %s doesn't have a public ip" % instance.id)
"Instance %s doesn't have a public ip" % hostname)
try:
key = utils.nodescan(server_ip, port=22, timeout=180)
except Exception:
raise exceptions.LaunchKeyscanException(
"Can't scan instance %s key" % instance.id)
"Can't scan instance %s key" % hostname)
self.log.info("Instance %s ready" % instance.id)
self.log.info("Instance %s ready" % hostname)
self.node.state = zk.READY
self.node.external_id = instance.id
self.node.hostname = server_ip
self.node.interface_ip = server_ip
self.node.public_ipv4 = server_ip
@ -94,9 +93,9 @@ class AzureInstanceLauncher(NodeLauncher):
self.node.host_keys = key
self.node.connection_port = 22
self.node.connection_type = "ssh"
self.node.username = self.label.username
self.node.username = self.label.cloud_image.username
self.zk.storeNode(self.node)
self.log.info("Instance %s is ready", instance.id)
self.log.info("Instance %s is ready", hostname)
class AzureNodeRequestHandler(NodeRequestHandler):

View File

@ -13,33 +13,21 @@
# under the License.
import logging
from azure.common.client_factory import get_client_from_auth_file
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.compute import ComputeManagementClient
from msrestazure.azure_exceptions import CloudError
import json
from nodepool.driver import Provider
from nodepool.driver.azure import handler
from nodepool import zk
from . import azul
class AzureProvider(Provider):
log = logging.getLogger("nodepool.driver.azure.AzureProvider")
API_VERSION_COMPUTE = "2019-12-01"
API_VERSION_DISKS = "2019-11-01"
API_VERSION_NETWORK = "2020-03-01"
API_VERSION_RESOURCE = "2019-10-01"
def __init__(self, provider, *args):
self.provider = provider
self.zuul_public_key = provider.zuul_public_key
self.compute_client = None
self.disks_client = None
self.network_client = None
self.resource_client = None
self.resource_group = provider.resource_group
self.resource_group_location = provider.resource_group_location
self._zk = None
@ -49,59 +37,23 @@ class AzureProvider(Provider):
self._zk = zk_conn
self.log.debug(
"Using %s as auth_path for Azure auth" % self.provider.auth_path)
if self.compute_client is None:
self.compute_client = self._get_compute_client()
if self.disks_client is None:
self.disks_client = self._get_disks_client()
if self.network_client is None:
self.network_client = self._get_network_client()
if self.resource_client is None:
self.resource_client = self._get_resource_client()
def _get_compute_client(self):
return get_client_from_auth_file(
ComputeManagementClient,
auth_path=self.provider.auth_path,
api_version=self.API_VERSION_COMPUTE
)
def _get_disks_client(self):
return get_client_from_auth_file(
ComputeManagementClient,
auth_path=self.provider.auth_path,
api_version=self.API_VERSION_DISKS
)
def _get_network_client(self):
return get_client_from_auth_file(
NetworkManagementClient,
auth_path=self.provider.auth_path,
api_version=self.API_VERSION_NETWORK
)
def _get_resource_client(self):
return get_client_from_auth_file(
ResourceManagementClient,
auth_path=self.provider.auth_path,
api_version=self.API_VERSION_RESOURCE
)
with open(self.provider.auth_path) as f:
self.azul = azul.AzureCloud(json.load(f))
def stop(self):
self.log.debug("Stopping")
def listNodes(self):
return self.compute_client.virtual_machines.list(self.resource_group)
return self.azul.virtual_machines.list(self.resource_group)
def listNICs(self):
return self.network_client.network_interfaces.list(self.resource_group)
return self.azul.network_interfaces.list(self.resource_group)
def listPIPs(self):
return self.network_client.public_ip_addresses.list(
self.resource_group)
return self.azul.public_ip_addresses.list(self.resource_group)
def listDisks(self):
return self.disks_client.disks.list_by_resource_group(
self.resource_group)
return self.azul.disks.list(self.resource_group)
def labelReady(self, name):
return True
@ -120,86 +72,89 @@ class AzureProvider(Provider):
def _cleanupLeakedDisks(self):
for disk in self.listDisks():
if disk.tags is None:
if disk['tags'] is None:
# Nothing to check ownership against, move on
continue
if 'nodepool_provider_name' not in disk.tags:
if 'nodepool_provider_name' not in disk['tags']:
continue
if disk.tags['nodepool_provider_name'] != self.provider.name:
if disk['tags']['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if not self._zk.getNode(disk.tags['nodepool_id']):
if not self._zk.getNode(disk['tags']['nodepool_id']):
self.log.warning(
"Marking for delete leaked Disk %s (%s) in %s "
"(unknown node id %s)",
disk.name, disk.id, self.provider.name,
disk.tags['nodepool_id']
disk['name'], disk['id'], self.provider.name,
disk['tags']['nodepool_id']
)
try:
self.disks_client.disks.delete(
self.azul.wait_for_async_operation(
self.azul.disks.delete(
self.resource_group,
disk.name).wait()
except CloudError as e:
disk['name']))
except azul.AzureError as e:
self.log.warning(
"Failed to cleanup Disk %s (%s). Error: %r",
disk.name, disk.id, e
disk['name'], disk['id'], e
)
def _cleanupLeakedNICs(self):
for nic in self.listNICs():
if nic.tags is None:
if nic['tags'] is None:
# Nothing to check ownership against, move on
continue
if 'nodepool_provider_name' not in nic.tags:
if 'nodepool_provider_name' not in nic['tags']:
continue
if nic.tags['nodepool_provider_name'] != self.provider.name:
if nic['tags']['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if not self._zk.getNode(nic.tags['nodepool_id']):
if not self._zk.getNode(nic['tags']['nodepool_id']):
self.log.warning(
"Marking for delete leaked NIC %s (%s) in %s "
"(unknown node id %s)",
nic.name, nic.id, self.provider.name,
nic.tags['nodepool_id']
nic['name'], nic['id'], self.provider.name,
nic['tags']['nodepool_id']
)
try:
self.network_client.network_interfaces.delete(
self.azul.wait_for_async_operation(
self.azul.network_interfaces.delete(
self.resource_group,
nic.name).wait()
except CloudError as e:
nic['name']))
except azul.AzureError as e:
self.log.warning(
"Failed to cleanup NIC %s (%s). Error: %r",
nic.name, nic.id, e
nic['name'], nic['id'], e
)
def _cleanupLeakedPIPs(self):
for pip in self.listPIPs():
if pip.tags is None:
if pip['tags'] is None:
# Nothing to check ownership against, move on
continue
if 'nodepool_provider_name' not in pip.tags:
if 'nodepool_provider_name' not in pip['tags']:
continue
if pip.tags['nodepool_provider_name'] != self.provider.name:
if pip['tags']['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if not self._zk.getNode(pip.tags['nodepool_id']):
if not self._zk.getNode(pip['tags']['nodepool_id']):
self.log.warning(
"Marking for delete leaked PIP %s (%s) in %s "
"(unknown node id %s)",
pip.name, pip.id, self.provider.name,
pip.tags['nodepool_id']
pip['name'], pip['id'], self.provider.name,
pip['tags']['nodepool_id']
)
try:
self.network_client.public_ip_addresses.delete(
self.azul.wait_for_async_operation(
self.azul.public_ip_addresses.delete(
self.resource_group,
pip.name).wait()
except CloudError as e:
pip['name']))
except azul.AzureError as e:
self.log.warning(
"Failed to cleanup IP %s (%s). Error: %r",
pip.name, pip.id, e
pip['name'], pip['id'], e
)
def _cleanupLeakedNodes(self):
@ -215,28 +170,28 @@ class AzureProvider(Provider):
deleting_nodes[node.provider].append(node.external_id)
for n in self.listNodes():
if n.tags is None:
if n['tags'] is None:
# Nothing to check ownership against, move on
continue
if 'nodepool_provider_name' not in n.tags:
if 'nodepool_provider_name' not in n['tags']:
continue
if n.tags['nodepool_provider_name'] != self.provider.name:
if n['tags']['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if (self.provider.name in deleting_nodes and
n.id in deleting_nodes[self.provider.name]):
n['id'] in deleting_nodes[self.provider.name]):
# Already deleting this node
continue
if not self._zk.getNode(n.tags['nodepool_id']):
if not self._zk.getNode(n['tags']['nodepool_id']):
self.log.warning(
"Marking for delete leaked instance %s (%s) in %s "
"(unknown node id %s)",
n.name, n.id, self.provider.name,
n.tags['nodepool_id']
n['name'], n['id'], self.provider.name,
n['tags']['nodepool_id']
)
node = zk.Node()
node.external_id = n.id
node.external_id = n['name']
node.provider = self.provider.name
node.state = zk.DELETING
self._zk.storeNode(node)
@ -244,9 +199,9 @@ class AzureProvider(Provider):
def cleanupNode(self, server_id):
self.log.debug('Server ID: %s' % server_id)
try:
vm = self.compute_client.virtual_machines.get(
self.resource_group, server_id.rsplit('/', 1)[1])
except CloudError as e:
vm = self.azul.virtual_machines.get(
self.resource_group, server_id)
except azul.AzureError as e:
if e.status_code == 404:
return
self.log.warning(
@ -254,40 +209,43 @@ class AzureProvider(Provider):
server_id, e
)
self.compute_client.virtual_machines.delete(
self.resource_group, server_id.rsplit('/', 1)[1]).wait()
self.azul.wait_for_async_operation(
self.azul.virtual_machines.delete(
self.resource_group, server_id))
nic_deletion = self.network_client.network_interfaces.delete(
self.resource_group, "%s-nic" % server_id.rsplit('/', 1)[1])
nic_deletion.wait()
self.azul.wait_for_async_operation(
self.azul.network_interfaces.delete(
self.resource_group, "%s-nic" % server_id))
pip_deletion = self.network_client.public_ip_addresses.delete(
self.resource_group, "%s-nic-pip" % server_id.rsplit('/', 1)[1])
pip_deletion.wait()
self.azul.wait_for_async_operation(
self.azul.public_ip_addresses.delete(
self.resource_group,
"%s-nic-pip" % server_id))
if self.provider.ipv6:
pip_deletion = self.network_client.public_ip_addresses.delete(
self.azul.wait_for_async_operation(
self.azul.public_ip_addresses.delete(
self.resource_group,
"%s-nic-v6-pip" % server_id.rsplit('/', 1)[1])
pip_deletion.wait()
"%s-nic-v6-pip" % server_id))
disk_handle_list = []
for disk in self.listDisks():
if disk.tags is not None and \
disk.tags.get('nodepool_id') == vm.tags['nodepool_id']:
async_disk_delete = self.disks_client.disks.delete(
self.resource_group, disk.name)
if disk['tags'] is not None and \
disk['tags'].get('nodepool_id') == vm['tags']['nodepool_id']:
async_disk_delete = self.azul.disks.delete(
self.resource_group, disk['name'])
disk_handle_list.append(async_disk_delete)
for async_disk_delete in disk_handle_list:
async_disk_delete.wait()
self.azul.wait_for_async_operation(
async_disk_delete)
def waitForNodeCleanup(self, server_id):
# All async tasks are handled in cleanupNode
return True
def getInstance(self, server_id):
return self.compute_client.virtual_machines.get(
self.resource_group, server_id, expand='instanceView')
return self.azul.virtual_machines.get(
self.resource_group, server_id)
def createInstance(
self, hostname, label, nodepool_id, nodepool_node_label=None):
@ -299,7 +257,7 @@ class AzureProvider(Provider):
if nodepool_node_label:
tags['nodepool_node_label'] = nodepool_node_label
self.resource_client.resource_groups.create_or_update(
self.azul.resource_groups.create(
self.resource_group, {
'location': self.provider.resource_group_location,
'tags': tags
@ -307,107 +265,120 @@ class AzureProvider(Provider):
tags['nodepool_id'] = nodepool_id
v4_params_create = {
'location': self.provider.location,
'public_ip_allocation_method': 'dynamic',
'tags': tags,
'properties': {
'publicIpAllocationMethod': 'dynamic',
},
}
v4_pip_poll = self.network_client.public_ip_addresses.create_or_update(
v4_public_ip = self.azul.public_ip_addresses.create(
self.resource_group,
"%s-nic-pip" % hostname,
v4_params_create,
)
v4_public_ip = v4_pip_poll.result()
nic_data = {
'location': self.provider.location,
'tags': tags,
'ip_configurations': [{
'name': "zuul-v4-ip-config",
'private_ip_address_version': 'IPv4',
'properties': {
'ipConfigurations': [{
'name': "nodepool-v4-ip-config",
'properties': {
'privateIpAddressVersion': 'IPv4',
'subnet': {
'id': self.provider.subnet_id
},
'public_ip_address': {
'id': v4_public_ip.id
'publicIpAddress': {
'id': v4_public_ip['id']
}
}
}]
}
}
if self.provider.ipv6:
nic_data['ip_configurations'].append({
nic_data['properties']['ipConfigurations'].append({
'name': "zuul-v6-ip-config",
'private_ip_address_version': 'IPv6',
'properties': {
'privateIpAddressVersion': 'IPv6',
'subnet': {
'id': self.provider.subnet_id
}
}
})
nic_creation = self.network_client.network_interfaces.create_or_update(
nic = self.azul.network_interfaces.create(
self.resource_group,
"%s-nic" % hostname,
nic_data
)
nic = nic_creation.result()
vm_creation = self.compute_client.virtual_machines.create_or_update(
vm = self.azul.virtual_machines.create(
self.resource_group, hostname, {
'location': self.provider.location,
'os_profile': {
'computer_name': hostname,
'admin_username': label.username,
'linux_configuration': {
'tags': tags,
'properties': {
'osProfile': {
'computerName': hostname,
'adminUsername': label.cloud_image.username,
'linuxConfiguration': {
'ssh': {
'public_keys': [{
'publicKeys': [{
'path': "/home/%s/.ssh/authorized_keys" % (
label.username),
'key_data': self.provider.zuul_public_key,
label.cloud_image.username),
'keyData': label.cloud_image.key,
}]
},
"disable_password_authentication": True,
"disablePasswordAuthentication": True,
}
},
'hardware_profile': {
'vmSize': label.hardwareProfile["vm-size"]
'hardwareProfile': {
'vmSize': label.hardware_profile["vm-size"]
},
'storage_profile': {'image_reference': label.imageReference},
'network_profile': {
'network_interfaces': [{
'id': nic.id,
'storageProfile': {
'imageReference': label.cloud_image.image_reference
},
'networkProfile': {
'networkInterfaces': [{
'id': nic['id'],
'properties': {
'primary': True,
}
}]
},
'tags': tags,
},
})
return vm_creation.result()
return vm
def getIpaddress(self, instance):
# Copied from https://github.com/Azure/azure-sdk-for-python/issues/897
ni_reference = instance.network_profile.network_interfaces[0]
ni_reference = ni_reference.id.split('/')
ni_reference = (instance['properties']['networkProfile']
['networkInterfaces'][0])
ni_reference = ni_reference['id'].split('/')
ni_group = ni_reference[4]
ni_name = ni_reference[8]
net_interface = self.network_client.network_interfaces.get(
net_interface = self.azul.network_interfaces.get(
ni_group, ni_name)
ip_reference = net_interface.ip_configurations[0].public_ip_address
ip_reference = ip_reference.id.split('/')
ip_reference = (net_interface['properties']['ipConfigurations'][0]
['properties']['publicIPAddress'])
ip_reference = ip_reference['id'].split('/')
ip_group = ip_reference[4]
ip_name = ip_reference[8]
public_ip = self.network_client.public_ip_addresses.get(
public_ip = self.azul.public_ip_addresses.get(
ip_group, ip_name)
public_ip = public_ip.ip_address
public_ip = public_ip['properties']['ipAddress']
return public_ip
def getv6Ipaddress(self, instance):
# Copied from https://github.com/Azure/azure-sdk-for-python/issues/897
ni_reference = instance.network_profile.network_interfaces[0]
ni_reference = ni_reference.id.split('/')
ni_reference = (instance['properties']['networkProfile']
['networkInterfaces'][0])
ni_reference = ni_reference['id'].split('/')
ni_group = ni_reference[4]
ni_name = ni_reference[8]
net_interface = self.network_client.network_interfaces.get(
net_interface = self.azul.network_interfaces.get(
ni_group, ni_name)
return net_interface.ip_configurations[1].private_ip_address
return (net_interface['properties']['ipConfigurations'][1]
['properties']['privateIPAddress'])

View File

@ -375,12 +375,25 @@ class DBTestCase(BaseTestCase):
self.log = logging.getLogger("tests")
self.setupZK()
def setup_config(self, filename, images_dir=None, context_name=None):
def setup_config(self, filename, images_dir=None, **kw):
if images_dir is None:
images_dir = fixtures.TempDir()
self.useFixture(images_dir)
build_log_dir = fixtures.TempDir()
self.useFixture(build_log_dir)
format_dict = dict(
images_dir=images_dir.path,
build_log_dir=build_log_dir.path,
zookeeper_host=self.zookeeper_host,
zookeeper_port=self.zookeeper_port,
zookeeper_chroot=self.zookeeper_chroot,
zookeeper_ca=self.zookeeper_ca,
zookeeper_cert=self.zookeeper_cert,
zookeeper_key=self.zookeeper_key
)
format_dict.update(kw)
if filename.startswith('/'):
path = filename
else:
@ -389,15 +402,7 @@ class DBTestCase(BaseTestCase):
(fd, path) = tempfile.mkstemp()
with open(configfile, 'rb') as conf_fd:
config = conf_fd.read().decode('utf8')
data = config.format(images_dir=images_dir.path,
build_log_dir=build_log_dir.path,
context_name=context_name,
zookeeper_host=self.zookeeper_host,
zookeeper_port=self.zookeeper_port,
zookeeper_chroot=self.zookeeper_chroot,
zookeeper_ca=self.zookeeper_ca,
zookeeper_cert=self.zookeeper_cert,
zookeeper_key=self.zookeeper_key)
data = config.format(**format_dict)
os.write(fd, data.encode('utf8'))
os.close(fd)
self._config_images_dir = images_dir

View File

@ -3,8 +3,9 @@ webapp:
listen_address: '0.0.0.0'
zookeeper-servers:
- host: 127.0.0.1
port: 2181
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
@ -21,9 +22,9 @@ providers:
zuul-public-key: ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC+mplenM+m6pNY9Un3fpO9eqf808Jrfb3d1gXg7BZVawCvtEZ/cDYvLQ3OF1AeL2kcIC0UAIglM5JXae7yO5CJbJRdkbXvv0u1LvpLxYSPM4ATR0r4IseC5YVxkfJQNi4ixSwTqD4ScEkuCXcSqSU9M+hB+KlnwXoR4IcYHf7vD2Z0Mdwm2ikk3SeERmspmMxx/uz0SPn58QxONuoTlNWQKqDWsV6bRyoPa6HWccMrIH1/e7E69Nw/30oioOQpKBgaDCauh+QkDtSkjRpRMOV47ZFh16Q9DqMgLx+FD8z6++9rsHlB65Zas1xyQsiRCFG09s00b7OR7Xz9ukQ5+vXV
resource-group-location: centralus
location: centralus
resource-group: ZuulCI
auth-path: /etc/nodepool/azurecredentials.json
subnet-id: /subscriptions/c35cf7df-ed75-4c85-be00-535409a85120/resourceGroups/ZuulCI/providers/Microsoft.Network/virtualNetworks/NodePool/subnets/default
resource-group: nodepool
auth-path: {auth_path}
subnet-id: /subscriptions/c35cf7df-ed75-4c85-be00-535409a85120/resourceGroups/nodepool/providers/Microsoft.Network/virtualNetworks/NodePool/subnets/default
cloud-images:
- name: bionic
username: zuul

View File

@ -0,0 +1,270 @@
# Copyright (C) 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import time
import os
import re
import tempfile
import urllib
import uuid
import fixtures
import responses
class CRUDManager:
name = ''
def __init__(self, cloud):
self.cloud = cloud
self.items = []
def list(self, request):
resp = {'value': self.items}
return (200, {}, json.dumps(resp))
def get(self, request):
url = urllib.parse.urlparse(request.path_url)
for item in self.items:
if item['id'] == url.path:
return (200, {}, json.dumps(item))
return (404, {}, json.dumps({'error': {'message': 'Not Found'}}))
class ResourceGroupsCRUD(CRUDManager):
name = "resourcegroups"
def put(self, request):
data = json.loads(request.body)
url = urllib.parse.urlparse(request.path_url)
name = url.path.split('/')[-1]
data['id'] = url.path
data['name'] = name
data['type'] = "Microsoft.Resources/resourceGroups"
data['provisioningState'] = 'Succeeded'
self.items.append(data)
return (200, {}, json.dumps(data))
class PublicIPAddressesCRUD(CRUDManager):
name = "Microsoft.Network/publicIPAddresses"
def put(self, request):
data = json.loads(request.body)
url = urllib.parse.urlparse(request.path_url)
name = url.path.split('/')[-1]
data['id'] = url.path
data['name'] = name
data['type'] = self.name
data['properties'] = {
"provisioningState": "Updating",
"resourceGuid": str(uuid.uuid4()),
"publicIPAddressVersion": "IPv4",
"publicIPAllocationMethod": "Dynamic",
"idleTimeoutInMinutes": 4,
"ipTags": []
}
self.items.append(data)
ret = json.dumps(data)
# Finish provisioning after return
data['properties']['ipAddress'] = "fake"
data['properties']['provisioningState'] = "Succeeded"
return (200, {}, ret)
class NetworkInterfacesCRUD(CRUDManager):
name = "Microsoft.Network/networkInterfaces"
def put(self, request):
data = json.loads(request.body)
url = urllib.parse.urlparse(request.path_url)
name = url.path.split('/')[-1]
data['id'] = url.path
data['name'] = name
data['type'] = self.name
ipconfig = data['properties']['ipConfigurations'][0]
data['properties'] = {
"provisioningState": "Succeeded",
"resourceGuid": str(uuid.uuid4()),
"ipConfigurations": [
{
"name": ipconfig['name'],
"id": os.path.join(data['id'], ipconfig['name']),
"type": ("Microsoft.Network/networkInterfaces/"
"ipConfigurations"),
"properties": {
"provisioningState": "Succeeded",
"privateIPAddress": "10.0.0.4",
"privateIPAllocationMethod": "Dynamic",
"publicIPAddress": (ipconfig['properties']
['publicIpAddress']),
"subnet": ipconfig['properties']['subnet'],
"primary": True,
"privateIPAddressVersion": "IPv4",
},
}
],
"enableAcceleratedNetworking": False,
"enableIPForwarding": False,
"hostedWorkloads": [],
"tapConfigurations": [],
"nicType": "Standard"
}
self.items.append(data)
return (200, {}, json.dumps(data))
class VirtualMachinesCRUD(CRUDManager):
name = "Microsoft.Compute/virtualMachines"
def put(self, request):
data = json.loads(request.body)
url = urllib.parse.urlparse(request.path_url)
name = url.path.split('/')[-1]
data['id'] = url.path
data['name'] = name
data['type'] = self.name
data['properties'] = {
"vmId": str(uuid.uuid4()),
"hardwareProfile": data['properties']['hardwareProfile'],
"storageProfile": {
"imageReference": (data['properties']['storageProfile']
['imageReference']),
"osDisk": {
"osType": "Linux",
"createOption": "FromImage",
"caching": "ReadWrite",
"managedDisk": {
"storageAccountType": "Premium_LRS"
},
"diskSizeGB": 30
},
"dataDisks": []
},
"osProfile": data['properties']['osProfile'],
"networkProfile": data['properties']['networkProfile'],
"provisioningState": "Creating"
}
self.items.append(data)
disk_data = data.copy()
disk_data['name'] = 'bionic-azure-' + str(uuid.uuid4())
disk_data['type'] = "Microsoft.Compute/disks"
disk_data['id'] = '/'.join(url.path.split('/')[:5] +
[disk_data['type'], disk_data['name']])
disk_data['properties'] = {"provisioningState": "Succeeded"}
self.cloud.crud["Microsoft.Compute/disks"].items.append(disk_data)
ret = json.dumps(data)
# Finish provisioning after return
data['properties']['provisioningState'] = "Succeeded"
return (200, {}, ret)
class DisksCRUD(CRUDManager):
name = "Microsoft.Compute/disks"
def put(self, request):
data = json.loads(request.body)
url = urllib.parse.urlparse(request.path_url)
name = url.path.split('/')[-1]
data['id'] = url.path
data['name'] = name
data['type'] = self.name
data['properties'] = {
"provisioningState": "Succeeded",
}
self.items.append(data)
return (200, {}, json.dumps(data))
class FakeAzureFixture(fixtures.Fixture):
tenant_id = str(uuid.uuid4())
subscription_id = str(uuid.uuid4())
access_token = "secret_token"
auth = {
"clientId": str(uuid.uuid4()),
"clientSecret": str(uuid.uuid4()),
"subscriptionId": subscription_id,
"tenantId": tenant_id,
"activeDirectoryEndpointUrl": "https://login.microsoftonline.com",
"resourceManagerEndpointUrl": "https://management.azure.com/",
"activeDirectoryGraphResourceId": "https://graph.windows.net/",
"sqlManagementEndpointUrl":
"https://management.core.windows.net:8443/",
"galleryEndpointUrl": "https://gallery.azure.com/",
"managementEndpointUrl": "https://management.core.windows.net/",
}
def _setUp(self):
self.crud = {}
self.responses = responses.RequestsMock()
self.responses.start()
self.auth_file = tempfile.NamedTemporaryFile('w', delete=False)
with self.auth_file as f:
json.dump(self.auth, f)
self.responses.add(
responses.POST,
f'https://login.microsoftonline.com/{self.tenant_id}/oauth2/token',
json={
'access_token': 'secret_token',
'expires_on': time.time() + 600,
})
self._setup_crud(ResourceGroupsCRUD, '2020-06-01',
resource_grouped=False)
self._setup_crud(VirtualMachinesCRUD, '2020-12-01')
self._setup_crud(NetworkInterfacesCRUD, '2020-07-01')
self._setup_crud(PublicIPAddressesCRUD, '2020-07-01')
self._setup_crud(DisksCRUD, '2020-06-30')
self.addCleanup(self.responses.stop)
self.addCleanup(self.responses.reset)
def _setup_crud(self, manager, api_version, resource_grouped=True):
self.crud[manager.name] = manager(self)
if resource_grouped:
rg = 'resourceGroups/(.*?)/providers/'
else:
rg = ''
list_re = re.compile(
'https://management.azure.com/subscriptions/'
+ f'{self.subscription_id}/'
+ rg + f'{manager.name}?\\?api-version={api_version}')
crud_re = re.compile(
'https://management.azure.com/subscriptions/'
+ f'{self.subscription_id}/'
+ rg + f'{manager.name}/(.*?)?\\?api-version={api_version}')
self.responses.add_callback(
responses.GET, list_re, callback=self.crud[manager.name].list,
content_type='application/json')
self.responses.add_callback(
responses.GET, crud_re, callback=self.crud[manager.name].get,
content_type='application/json')
self.responses.add_callback(
responses.PUT, crud_re, callback=self.crud[manager.name].put,
content_type='application/json')
def _extract_resource_group(self, path):
url = re.compile('/subscriptions/(.*?)/resourceGroups/(.*?)/')
m = url.match(path)
return m.group(2)

View File

@ -1,4 +1,5 @@
# Copyright (C) 2018 Red Hat
# Copyright (C) 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,67 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import fixtures
import logging
import os
import tempfile
from unittest.mock import MagicMock
import yaml
from nodepool import tests
from nodepool import zk
from nodepool import nodeutils as utils
from nodepool.driver.azure import provider, AzureProvider
from azure.common.client_factory import get_client_from_json_dict
from azure.mgmt.resource.resources.v2019_10_01.operations import ResourceGroupsOperations # noqa
from azure.mgmt.network.v2020_03_01.operations import PublicIPAddressesOperations # noqa
from azure.mgmt.network.v2020_03_01.operations import NetworkInterfacesOperations # noqa
from azure.mgmt.compute.v2019_12_01.operations import VirtualMachinesOperations
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.compute import ComputeManagementClient
auth = {
"clientId": "ad735158-65ca-11e7-ba4d-ecb1d756380e",
"clientSecret": "b70bb224-65ca-11e7-810c-ecb1d756380e",
"subscriptionId": "bfc42d3a-65ca-11e7-95cf-ecb1d756380e",
"tenantId": "c81da1d8-65ca-11e7-b1d1-ecb1d756380e",
"activeDirectoryEndpointUrl": "https://login.microsoftonline.com",
"resourceManagerEndpointUrl": "https://management.azure.com/",
"activeDirectoryGraphResourceId": "https://graph.windows.net/",
"sqlManagementEndpointUrl": "https://management.core.windows.net:8443/",
"galleryEndpointUrl": "https://gallery.azure.com/",
"managementEndpointUrl": "https://management.core.windows.net/",
}
class FakeAzureResource:
def __init__(self, id_, provisioning_state='Unknown'):
self.id = id_
self.provisioning_state = provisioning_state
class FakePIPResult:
@staticmethod
def result():
return FakeAzureResource('fake_pip_id')
class FakeNICResult:
@staticmethod
def result():
return FakeAzureResource('fake_nic_id')
class FakeVMResult:
@staticmethod
def result():
return FakeAzureResource('fake_vm_id', provisioning_state='Succeeded')
from . import fake_azure
class TestDriverAzure(tests.DBTestCase):
@ -82,103 +28,13 @@ class TestDriverAzure(tests.DBTestCase):
def setUp(self):
super().setUp()
self.useFixture(fixtures.MockPatchObject(
provider.AzureProvider, 'cleanupLeakedResources',
MagicMock()))
self.useFixture(fixtures.MockPatchObject(
provider.AzureProvider, 'cleanupNode',
MagicMock()))
self.useFixture(fixtures.MockPatchObject(
provider.AzureProvider, 'getIpaddress',
MagicMock(return_value="127.0.0.1")))
self.useFixture(fixtures.MockPatchObject(
provider.AzureProvider, '_get_compute_client',
MagicMock(
return_value=get_client_from_json_dict(
ComputeManagementClient, auth, credentials={},
api_version=AzureProvider.API_VERSION_COMPUTE
)
)
))
self.useFixture(fixtures.MockPatchObject(
provider.AzureProvider, '_get_disks_client',
MagicMock(
return_value=get_client_from_json_dict(
ComputeManagementClient, auth, credentials={},
api_version=AzureProvider.API_VERSION_DISKS
)
)
))
self.useFixture(fixtures.MockPatchObject(
utils, 'nodescan',
MagicMock(return_value="FAKE_KEY")))
self.useFixture(fixtures.MockPatchObject(
provider.AzureProvider, '_get_network_client',
MagicMock(
return_value=get_client_from_json_dict(
NetworkManagementClient, auth, credentials={},
api_version=AzureProvider.API_VERSION_NETWORK
)
)
))
self.useFixture(fixtures.MockPatchObject(
provider.AzureProvider, '_get_resource_client',
MagicMock(
return_value=get_client_from_json_dict(
ResourceManagementClient, auth, credentials={},
api_version=AzureProvider.API_VERSION_RESOURCE
)
)
))
self.useFixture(fixtures.MockPatchObject(
ResourceGroupsOperations, 'create_or_update',
MagicMock(
return_value=FakeAzureResource('fake_rg_id'))
))
self.useFixture(fixtures.MockPatchObject(
PublicIPAddressesOperations, 'create_or_update',
MagicMock(return_value=FakePIPResult())
))
self.useFixture(fixtures.MockPatchObject(
NetworkInterfacesOperations, 'create_or_update',
MagicMock(return_value=FakeNICResult())
))
self.useFixture(fixtures.MockPatchObject(
VirtualMachinesOperations, 'create_or_update',
MagicMock(return_value=FakeVMResult())
))
self.fake_azure = fake_azure.FakeAzureFixture()
self.useFixture(self.fake_azure)
def test_azure_machine(self):
az_template = os.path.join(
os.path.dirname(__file__), '..', 'fixtures', 'azure.yaml')
with open(az_template) as f:
raw_config = yaml.safe_load(f)
raw_config['zookeeper-servers'][0] = {
'host': self.zookeeper_host,
'port': self.zookeeper_port,
'chroot': self.zookeeper_chroot,
}
raw_config['zookeeper-tls'] = {
'ca': self.zookeeper_ca,
'cert': self.zookeeper_cert,
'key': self.zookeeper_key,
}
with tempfile.NamedTemporaryFile() as tf:
tf.write(yaml.safe_dump(
raw_config, default_flow_style=False).encode('utf-8'))
tf.flush()
configfile = self.setup_config(tf.name)
configfile = self.setup_config(
'azure.yaml',
auth_path=self.fake_azure.auth_file.name)
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
req = zk.NodeRequest()
@ -195,4 +51,3 @@ class TestDriverAzure(tests.DBTestCase):
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertIsNone(node.shell_type)

View File

@ -0,0 +1,7 @@
---
upgrade:
- |
The ``zuul-public-key`` configuration attribute in the
``providers`` Azure driver has been moved and renamed. Please
move this setting to its new location at
:attr:`providers.[azure].cloud-images.key`

View File

@ -7,3 +7,4 @@ stestr>=1.0.0 # Apache-2.0
testscenarios
testtools>=0.9.27
moto
responses>=0.12.1