Drive statemachine in launcher

This adds some skeleton code to create a state machine in the
launcher and drive it.  A number of features in the AWS driver
still need configuration settings added, so they have been commented
out with "TODO" comments to add them back in as the configuration
matures.

This adds stub attributes to the node to store the state machine
data, but does not yet serialize it to ZK.

The test_launcher test is removed since it requires the full AWS
test setup.  That functionality is covered by test_jobs_executed
in test_aws_driver.

Change-Id: I6f2404afddcc5373016bcadefb3cb97857380a16
This commit is contained in:
James E. Blair
2024-07-22 15:25:13 -07:00
parent 96d364f24c
commit e09725fddc
7 changed files with 204 additions and 165 deletions

View File

@@ -112,6 +112,8 @@
launch-timeout: 600
object-storage:
bucket-name: zuul
# TODO
# key-name: zuul
flavors:
- name: normal
instance-type: t3.medium
@@ -119,6 +121,7 @@
instance-type: t3.large
images:
- name: debian
image-id: ami-1e749f67
- section:
name: aws-us-east-1
@@ -129,5 +132,7 @@
name: aws-us-east-1-main
section: aws-us-east-1
labels:
- debian-normal
- debian-large
- name: debian-normal
key-name: zuul
- name: debian-large
key-name: zuul

View File

@@ -84,48 +84,6 @@ class TestLauncher(ZuulTestCase):
dict(name='build-debian-local-image', result='SUCCESS'),
])
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_launcher(self):
result_queue = PipelineResultEventQueue(
self.zk_client, "tenant-one", "check")
labels = ["debian-normal", "debian-large"]
ctx = self.createZKContext(None)
# Lock the pipeline, so we can grab the result event
with pipeline_lock(self.zk_client, "tenant-one", "check"):
request = model.NodesetRequest.new(
ctx,
tenant_name="tenant-one",
pipeline_name="check",
buildset_uuid=uuid.uuid4().hex,
job_uuid=uuid.uuid4().hex,
job_name="foobar",
labels=labels,
priority=100,
request_time=time.time(),
zuul_event_id=uuid.uuid4().hex,
span_info=None,
)
for _ in iterate_timeout(10, "nodeset request to be fulfilled"):
result_events = list(result_queue)
if result_events:
for event in result_events:
# Remove event(s) from queue
result_queue.ack(event)
break
self.assertEqual(len(result_events), 1)
for event in result_queue:
self.assertEqual(event.request_id, request.uuid)
self.assertEqual(event.build_set_uuid, request.buildset_uuid)
request.refresh(ctx)
self.assertEqual(request.state, model.NodesetRequest.State.FULFILLED)
self.assertEqual(len(request.provider_nodes), 2)
request.delete(ctx)
self.waitUntilSettled()
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_launcher_missing_label(self):
result_queue = PipelineResultEventQueue(

View File

@@ -15,7 +15,6 @@
import base64
import cachetools.func
import copy
import functools
import hashlib
import json
@@ -122,24 +121,19 @@ class AwsCreateStateMachine(statemachine.StateMachine):
INSTANCE_CREATING = 'creating instance'
COMPLETE = 'complete'
def __init__(self, endpoint, hostname, label, image_external_id,
metadata, request, log):
def __init__(self, endpoint, hostname, label, flavor, image,
image_external_id, tags, log):
self.log = log
super().__init__()
self.endpoint = endpoint
self.attempts = 0
self.image_external_id = image_external_id
self.metadata = metadata
self.tags = label.tags.copy() or {}
for k, v in label.dynamic_tags.items():
try:
self.tags[k] = v.format(request=request.getSafeAttributes())
except Exception:
self.log.exception("Error formatting tag %s", k)
self.tags.update(metadata)
self.tags = tags.copy()
self.tags['Name'] = hostname
self.hostname = hostname
self.label = label
self.flavor = flavor
self.image = image
self.public_ipv4 = None
self.public_ipv6 = None
self.nic = None
@@ -150,7 +144,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
def advance(self):
if self.state == self.START:
if self.label.dedicated_host:
if self.flavor.dedicated_host:
self.state = self.HOST_ALLOCATING_START
else:
self.state = self.INSTANCE_CREATING_START
@@ -187,7 +181,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
if self.state == self.INSTANCE_CREATING_START:
self.create_future = self.endpoint._submitCreateInstance(
self.label, self.image_external_id,
self.label, self.flavor, self.image, self.image_external_id,
self.tags, self.hostname, self.dedicated_host_id, self.log)
self.state = self.INSTANCE_CREATING_SUBMIT
@@ -198,7 +192,8 @@ class AwsCreateStateMachine(statemachine.StateMachine):
return
self.instance = instance
self.external_id['instance'] = instance['InstanceId']
self.quota = self.endpoint.getQuotaForLabel(self.label)
self.quota = self.endpoint.getQuotaForLabel(
self.label, self.flavor)
self.state = self.INSTANCE_CREATING
if self.state == self.INSTANCE_CREATING:
@@ -545,24 +540,25 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
yield AwsInstance(self.region, instance, None, quota)
def getQuotaForLabel(self, label):
def getQuotaForLabel(self, label, flavor):
# For now, we are optimistically assuming that when an
# instance is launched on a dedicated host, it is not counted
# against instance quota. That may be overly optimistic. If
# it is, then we will merge the two quotas below rather than
# switch.
if label.dedicated_host:
if flavor.dedicated_host:
quota = self._getQuotaForHostType(
label.instance_type)
flavor.instance_type)
else:
quota = self._getQuotaForInstanceType(
label.instance_type,
SPOT if label.use_spot else ON_DEMAND)
if label.volume_type:
quota.add(self._getQuotaForVolumeType(
label.volume_type,
storage=label.volume_size,
iops=label.iops))
flavor.instance_type,
SPOT if flavor.use_spot else ON_DEMAND)
# TODO
# if label.volume_type:
# quota.add(self._getQuotaForVolumeType(
# label.volume_type,
# storage=label.volume_size,
# iops=label.iops))
return quota
def uploadImage(self, provider_image, image_name, filename,
@@ -1232,9 +1228,9 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
self.image_id_by_filter_cache[cache_key] = val
return val
def _getImageId(self, cloud_image):
image_id = cloud_image.image_id
image_filters = cloud_image.image_filters
def _getImageId(self, image):
image_id = image.image_id
image_filters = image.image_filters
if image_filters is not None:
return self._getLatestImageIdByFilters(image_filters)
@@ -1298,11 +1294,12 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
return dict(HostId=host_ids[0],
State='pending')
def _submitCreateInstance(self, label, image_external_id,
tags, hostname, dedicated_host_id, log):
def _submitCreateInstance(self, label, flavor, image,
image_external_id, tags, hostname,
dedicated_host_id, log):
return self.create_executor.submit(
self._createInstance,
label, image_external_id,
label, flavor, image, image_external_id,
tags, hostname, dedicated_host_id, log)
def _completeCreateInstance(self, future):
@@ -1323,22 +1320,22 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
raise exceptions.CapacityException(str(error))
raise
def _createInstance(self, label, image_external_id,
def _createInstance(self, label, flavor, image, image_external_id,
tags, hostname, dedicated_host_id, log):
if image_external_id:
image_id = image_external_id
else:
image_id = self._getImageId(label.cloud_image)
image_id = self._getImageId(image)
args = dict(
ImageId=image_id,
MinCount=1,
MaxCount=1,
KeyName=label.key_name,
EbsOptimized=label.ebs_optimized,
InstanceType=label.instance_type,
EbsOptimized=flavor.ebs_optimized,
InstanceType=flavor.instance_type,
NetworkInterfaces=[{
'AssociatePublicIpAddress': label.pool.public_ipv4,
'AssociatePublicIpAddress': flavor.public_ipv4,
'DeviceIndex': 0}],
TagSpecifications=[
{
@@ -1352,28 +1349,29 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
]
)
if label.pool.security_group_id:
args['NetworkInterfaces'][0]['Groups'] = [
label.pool.security_group_id
]
if label.pool.subnet_id:
args['NetworkInterfaces'][0]['SubnetId'] = label.pool.subnet_id
# TODO:
# if label.pool.security_group_id:
# args['NetworkInterfaces'][0]['Groups'] = [
# label.pool.security_group_id
# ]
# if label.pool.subnet_id:
# args['NetworkInterfaces'][0]['SubnetId'] = label.pool.subnet_id
if label.pool.public_ipv6:
args['NetworkInterfaces'][0]['Ipv6AddressCount'] = 1
# if label.pool.public_ipv6:
# args['NetworkInterfaces'][0]['Ipv6AddressCount'] = 1
if label.userdata:
args['UserData'] = label.userdata
# if label.userdata:
# args['UserData'] = label.userdata
if label.iam_instance_profile:
if 'name' in label.iam_instance_profile:
args['IamInstanceProfile'] = {
'Name': label.iam_instance_profile['name']
}
elif 'arn' in label.iam_instance_profile:
args['IamInstanceProfile'] = {
'Arn': label.iam_instance_profile['arn']
}
# if label.iam_instance_profile:
# if 'name' in label.iam_instance_profile:
# args['IamInstanceProfile'] = {
# 'Name': label.iam_instance_profile['name']
# }
# elif 'arn' in label.iam_instance_profile:
# args['IamInstanceProfile'] = {
# 'Arn': label.iam_instance_profile['arn']
# }
# Default block device mapping parameters are embedded in AMIs.
# We might need to supply our own mapping before lauching the instance.
@@ -1383,45 +1381,48 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
# TODO: Flavors can also influence whether or not the VM spawns with a
# volume -- we basically need to ensure DeleteOnTermination is true.
# However, leaked volume detection may mitigate this.
if image.get('BlockDeviceMappings'):
bdm = image['BlockDeviceMappings']
mapping = copy.deepcopy(bdm[0])
if 'Ebs' in mapping:
mapping['Ebs']['DeleteOnTermination'] = True
if label.volume_size:
mapping['Ebs']['VolumeSize'] = label.volume_size
if label.volume_type:
mapping['Ebs']['VolumeType'] = label.volume_type
if label.iops:
mapping['Ebs']['Iops'] = label.iops
if label.throughput:
mapping['Ebs']['Throughput'] = label.throughput
# If the AMI is a snapshot, we cannot supply an "encrypted"
# parameter
if 'Encrypted' in mapping['Ebs']:
del mapping['Ebs']['Encrypted']
args['BlockDeviceMappings'] = [mapping]
# enable EC2 Spot
if label.use_spot:
args['InstanceMarketOptions'] = {
'MarketType': 'spot',
'SpotOptions': {
'SpotInstanceType': 'one-time',
'InstanceInterruptionBehavior': 'terminate'
}
}
# TODO
# if image.get('BlockDeviceMappings'):
# bdm = image['BlockDeviceMappings']
# mapping = copy.deepcopy(bdm[0])
# if 'Ebs' in mapping:
# mapping['Ebs']['DeleteOnTermination'] = True
# if label.volume_size:
# mapping['Ebs']['VolumeSize'] = label.volume_size
# if label.volume_type:
# mapping['Ebs']['VolumeType'] = label.volume_type
# if label.iops:
# mapping['Ebs']['Iops'] = label.iops
# if label.throughput:
# mapping['Ebs']['Throughput'] = label.throughput
# # If the AMI is a snapshot, we cannot supply an "encrypted"
# # parameter
# if 'Encrypted' in mapping['Ebs']:
# del mapping['Ebs']['Encrypted']
# args['BlockDeviceMappings'] = [mapping]
if label.imdsv2 == 'required':
args['MetadataOptions'] = {
'HttpTokens': 'required',
'HttpEndpoint': 'enabled',
}
elif label.imdsv2 == 'optional':
args['MetadataOptions'] = {
'HttpTokens': 'optional',
'HttpEndpoint': 'enabled',
}
# TODO
# if label.use_spot:
# args['InstanceMarketOptions'] = {
# 'MarketType': 'spot',
# 'SpotOptions': {
# 'SpotInstanceType': 'one-time',
# 'InstanceInterruptionBehavior': 'terminate'
# }
# }
# TODO
# if label.imdsv2 == 'required':
# args['MetadataOptions'] = {
# 'HttpTokens': 'required',
# 'HttpEndpoint': 'enabled',
# }
# elif label.imdsv2 == 'optional':
# args['MetadataOptions'] = {
# 'HttpTokens': 'optional',
# 'HttpEndpoint': 'enabled',
# }
if dedicated_host_id:
placement = args.setdefault('Placement', {})
@@ -1431,9 +1432,10 @@ class AwsProviderEndpoint(BaseProviderEndpoint):
'Affinity': 'host',
})
if label.pool.az:
placement = args.setdefault('Placement', {})
placement['AvailabilityZone'] = label.pool.az
# TODO
# if label.pool.az:
# placement = args.setdefault('Placement', {})
# placement['AvailabilityZone'] = label.pool.az
with self.rate_limiter(log.debug, "Created instance"):
log.debug("Creating VM %s", hostname)

View File

@@ -37,7 +37,10 @@ from zuul.provider import (
class AwsProviderImage(BaseProviderImage):
pass
def __init__(self, config):
super().__init__(config)
self.image_id = config.get('image-id')
self.image_filters = config.get('image-filters')
class AwsProviderFlavor(BaseProviderFlavor):
@@ -46,6 +49,9 @@ class AwsProviderFlavor(BaseProviderFlavor):
self.instance_type = config['instance-type']
self.volume_type = config.get('volume-type')
self.dedicated_host = config.get('dedicated-host', False)
self.ebs_optimized = bool(config.get('ebs-optimized', False))
# TODO
self.use_spot = False
class AwsProviderLabel(BaseProviderLabel):
@@ -95,11 +101,22 @@ class AwsProvider(BaseProvider, subclass_id='aws'):
def getEndpoint(self):
return self.driver.getEndpoint(self)
def getCreateStateMachine(self, hostname, label, image_external_id,
metadata, request, az, log):
return AwsCreateStateMachine(self.endpoint, hostname, label,
image_external_id, metadata,
request, log)
def getCreateStateMachine(self, node, image_external_id, log):
# TODO: decide on a method of producing a hostname
# that is max 15 chars.
hostname = f"np{node.uuid[:13]}"
label = self.labels[node.label]
flavor = self.flavors[label.flavor]
image = self.images[label.image]
return AwsCreateStateMachine(
self.endpoint,
hostname,
label,
flavor,
image,
image_external_id,
node.tags,
log)
def getDeleteStateMachine(self, external_id, log):
return AwsDeleteStateMachine(self.endpoint, external_id, log)
@@ -188,7 +205,8 @@ class AwsProvider(BaseProvider, subclass_id='aws'):
return QuotaInformation(**args)
def getQuotaForLabel(self, label):
return self.endpoint.getQuotaForLabel(label)
flavor = self.flavors[label.flavor]
return self.endpoint.getQuotaForLabel(label, flavor)
def uploadImage(self, provider_image, image_name,
filename, image_format, metadata, md5, sha256):
@@ -231,6 +249,7 @@ class AwsProviderSchema(BaseProviderSchema):
vs.Required('instance-type'): str,
'volume-type': str,
'dedicated-host': bool,
'ebs-optimized': bool,
})
return schema

View File

@@ -37,6 +37,7 @@ from zuul.zk.layout import (
LayoutStateStore,
)
from zuul.zk.locks import tenant_read_lock
from zuul.zk.system import ZuulSystem
from zuul.zk.zkobject import ZKContext
COMMANDS = (
@@ -67,6 +68,7 @@ class Launcher:
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.system = ZuulSystem(self.zk_client)
self.trigger_events = TenantTriggerEventQueue.createRegistry(
self.zk_client, self.connections
)
@@ -176,21 +178,26 @@ class Launcher:
provider_nodes = []
label_providers = self._selectProviders(request)
with self.createZKContext(request._lock, log) as ctx:
for i, (label, provider) in enumerate(label_providers):
for i, (label_name, provider) in enumerate(label_providers):
# Create a deterministic node UUID by using
# the request UUID as namespace.
node_uuid = uuid.uuid5(
request_uuid, f"{provider.name}-{i}-{label}").hex
request_uuid, f"{provider.name}-{i}-{label_name}").hex
label = provider.labels[label_name]
tags = provider.getNodeTags(
self.system.system_id, request, provider, label,
node_uuid)
# TODO: handle NodeExists errors
node_class = provider.driver.getProviderNodeClass()
node = node_class.new(
ctx,
uuid=node_uuid,
label=label,
label=label_name,
request_id=request.uuid,
connection_name=provider.connection_name,
tenant_name=request.tenant_name,
provider=provider.name,
tags=tags,
)
log.debug("Requested node %s", node)
provider_nodes.append(node.uuid)
@@ -257,9 +264,10 @@ class Launcher:
self._buildNode(node, log)
if node.state == model.ProviderNode.State.BUILDING:
self._checkNode(node, log)
except ProviderNodeError:
except ProviderNodeError as err:
state = model.ProviderNode.State.FAILED
log.exception("Marking node %s as %s", node, state)
log.exception("Marking node %s as %s: %s", node,
state, err)
with self.createZKContext(node._lock, self.log) as ctx:
node.updateAttributes(ctx, state=state)
@@ -270,17 +278,22 @@ class Launcher:
log.debug("Building node %s", node)
provider = self._getProvider(node.tenant_name, node.provider)
_ = provider.getEndpoint()
# TODO: launch a node
with self.createZKContext(node._lock, self.log) as ctx:
node.updateAttributes(ctx, state=model.ProviderNode.State.BUILDING)
with node.activeContext(ctx):
# TODO: this may be provided by Zuul once image
# uploads are supported
image_external_id = None
node.create_state_machine = provider.getCreateStateMachine(
node, image_external_id, log)
node.state = model.ProviderNode.State.BUILDING
def _checkNode(self, node, log):
log.debug("Checking node %s", node)
# TODO: implement node check
if True:
state = model.ProviderNode.State.READY
else:
state = model.ProviderNode.State.FAILED
node.create_state_machine.advance()
if not node.create_state_machine.complete:
self.wake_event.set()
return
state = model.ProviderNode.State.READY
log.debug("Marking node %s as %s", node, state)
with self.createZKContext(node._lock, self.log) as ctx:
node.updateAttributes(ctx, state=state)

View File

@@ -2323,6 +2323,7 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin,
label="",
connection_name="",
# Node data
hostname=None,
host_id=None,
interface_ip=None,
public_ipv4=None,
@@ -2341,8 +2342,13 @@ class ProviderNode(zkobject.PolymorphicZKObjectMixin,
resources=None,
attributes={},
tenant_name=None,
create_state={},
delete_state={},
tags={},
# Attributes that are not serialized
is_locked=False,
create_state_machine=None,
delete_state_machine=None,
# Attributes set by the launcher
_lscores=None,
)

View File

@@ -46,13 +46,18 @@ class BaseProviderFlavor(metaclass=abc.ABCMeta):
def __init__(self, config):
self.project_canonical_name = config['project_canonical_name']
self.name = config['name']
self.public_ipv4 = config.get('public-ipv4', False)
class BaseProviderLabel(metaclass=abc.ABCMeta):
def __init__(self, config):
self.project_canonical_name = config['project_canonical_name']
self.name = config['name']
self.flavor = config.get('flavor')
self.image = config.get('image')
self.min_ready = config.get('min-ready', 0)
self.tags = config.get('tags', {})
self.key_name = config.get('key-name')
class BaseProviderEndpoint(metaclass=abc.ABCMeta):
@@ -148,7 +153,7 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin,
def parseFlavors(self, config):
flavors = {}
for flavor_config in config.get('flavors', []):
f = self.parseLabel(flavor_config)
f = self.parseFlavor(flavor_config)
flavors[f.name] = f
return flavors
@@ -199,24 +204,52 @@ class BaseProvider(zkobject.PolymorphicZKObjectMixin,
def hasLabel(self, label):
return label in self.labels
def getCreateStateMachine(self, hostname, label,
image_external_id, metadata,
def getNodeTags(self, system_id, request, provider, label,
node_uuid):
"""Return the tags that should be stored with the node
:param str system_id: The Zuul system uuid
:param NodesetRequest request: The node request
:param Provider provider: The cloud provider
:param ProviderLabel label: The node label
:param str node_uuid: The node uuid
"""
tags = dict()
# TODO: add other potentially useful attrs from nodepool
attributes = model.Attributes(
request_id=request.uuid,
tenant_name=provider.tenant_name,
)
for k, v in label.tags.items():
try:
tags[k] = v.format(**attributes)
except Exception:
self.log.exception("Error formatting metadata %s", k)
fixed = {
'zuul_system_id': system_id,
'zuul_provider_name': self.tenant_scoped_name,
'zuul_provider_cname': self.canonical_name,
'zuul_node_uuid': node_uuid,
}
tags.update(fixed)
return tags
def getCreateStateMachine(self, node,
image_external_id,
log):
"""Return a state machine suitable for creating an instance
This method should return a new state machine object
initialized to create the described node.
:param str hostname: The hostname of the node.
:param ProviderNode node: The node object.
:param ProviderLabel label: A config object representing the
provider-label for the node.
:param str image_external_id: If provided, the external id of
a previously uploaded image; if None, then the adapter should
look up a cloud image based on the label.
:param metadata dict: A dictionary of metadata that must be
stored on the instance in the cloud. The same data must be
able to be returned later on :py:class:`Instance` objects
returned from `listInstances`.
:param log Logger: A logger instance for emitting annotated
logs related to the request.
@@ -385,6 +418,8 @@ class BaseProviderSchema(metaclass=abc.ABCMeta):
'description': str,
'image': str,
'flavor': str,
'tags': dict,
'key-name': str,
})
return schema
@@ -408,6 +443,7 @@ class BaseProviderSchema(metaclass=abc.ABCMeta):
vs.Required('project_canonical_name'): str,
vs.Required('name'): str,
'description': str,
'public-ipv4': bool,
})
return schema