Move dcmanager orchestration to a separate process

1) Remove DC manager orchestration from dcmanager-manager process
2) Create dcmanager-orchestrator process and associated files
3) Add new RPC calls for dcmanager-orchestrator process to notify
dcmanager
4) Create/update unit tests, to verify the implementation
changes

Story: 2007267
Task: 40734
Change-Id: Ibbbae77558a8a8fd95b636fa6c3aebb1dfefb514
Signed-off-by: Jessica Castelino <jessica.castelino@windriver.com>
This commit is contained in:
Jessica Castelino
2020-08-24 13:19:14 -04:00
parent 64caf6de7c
commit eb97f4c8b6
62 changed files with 784 additions and 228 deletions

View File

@@ -0,0 +1,172 @@
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import time
from dccommon.drivers.openstack import vim
from dcmanager.common import consts
from dcmanager.common.exceptions import StrategyStoppedException
from dcmanager.db import api as db_api
from dcmanager.orchestrator.states.base import BaseState
# Applying the vim update strategy may result in a loss of communication
# where API calls fail. The max time in this phase is 30 minutes
# (30 queries with 1 minute sleep)
DEFAULT_MAX_FAILED_QUERIES = 30
# Max time: 60 minutes = 60 queries x 60 seconds
# This is the max time for the state to change completion progress percent
DEFAULT_MAX_WAIT_ATTEMPTS = 60
# each loop while waiting for the apply will sleep for 60 seconds
WAIT_INTERVAL = 60
class ApplyingVIMStrategyState(BaseState):
"""State for creating the VIM FPGA update strategy."""
def __init__(self, region_name):
super(ApplyingVIMStrategyState, self).__init__(
next_state=consts.STRATEGY_STATE_FINISHING_FW_UPDATE, region_name=region_name)
self.max_failed_queries = DEFAULT_MAX_FAILED_QUERIES
self.wait_attempts = DEFAULT_MAX_WAIT_ATTEMPTS
self.wait_interval = WAIT_INTERVAL
def perform_state_action(self, strategy_step):
"""Apply a FPGA update strategy using VIM REST API
This code derives from patch orchestration: do_apply_subcloud_strategy
Any client (vim, sysinv, etc..) should be re-queried whenever used
to ensure the keystone token is up to date.
Any exceptions raised by this method set the strategy to FAILED
Returns the next state for the state machine if successful.
"""
region = self.get_region_name(strategy_step)
# query the vim strategy.
# Do not raise the default exception if there is no strategy
# because the default exception is unclear: ie: "Get strategy failed"
subcloud_strategy = self.get_vim_client(region).get_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE,
raise_error_if_missing=False)
if subcloud_strategy is None:
self.info_log(strategy_step, "Skip. There is no strategy to apply")
return self.next_state
# We have a VIM strategy, but need to check if it is ready to apply
if subcloud_strategy.state == vim.STATE_READY_TO_APPLY:
# An exception here will fail this state
subcloud_strategy = self.get_vim_client(region).apply_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE)
if subcloud_strategy.state == vim.STATE_APPLYING:
self.info_log(strategy_step, "VIM Strategy apply in progress")
else:
raise Exception("VIM strategy apply failed - "
"unexpected strategy state %s"
% subcloud_strategy.state)
# wait for the new strategy to apply or an existing strategy.
# Loop until the strategy applies. Repeatedly query the API
# This can take a long time.
# Waits for up to 60 minutes for the current phase or completion
# percentage to change before giving up.
wait_count = 0
get_fail_count = 0
last_details = ""
auth_failure = False
while True:
# todo(abailey): combine the sleep and stop check into one method
# which would allow the longer 60 second sleep to be broken into
# multiple smaller sleep calls
# If event handler stop has been triggered, fail the state
if self.stopped():
raise StrategyStoppedException()
# break out of the loop if the max number of attempts is reached
wait_count += 1
if wait_count >= self.wait_attempts:
raise Exception("Timeout applying firmware strategy.")
# every loop we wait, even the first one
time.sleep(self.wait_interval)
# get the strategy
try:
subcloud_strategy = self.get_vim_client(region).get_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE,
raise_error_if_missing=False)
auth_failure = False
get_fail_count = 0
except Exception as e:
if e.message == vim.VIM_AUTHORIZATION_FAILED:
# Since it can take hours to apply a strategy, there is a
# chance our keystone token will expire. Attempt to get
# a new token (by re-creating the client) and re-try the
# request, but only once.
if not auth_failure:
auth_failure = True
self.log_info(strategy_step,
"Authorization failure getting strategy."
" Retrying...")
continue
else:
raise Exception("Repeated authorization failure "
"getting firmware update strategy")
else:
# When applying the strategy to a subcloud, the VIM can
# be unreachable for a significant period of time when
# there is a controller swact, or in the case of AIO-SX,
# when the controller reboots.
get_fail_count += 1
if get_fail_count >= self.max_failed_queries:
# We have waited too long.
raise Exception("Timeout during recovery of apply "
"firmware strategy.")
self.debug_log(strategy_step,
"Unable to get firmware strategy - "
"attempt %d" % get_fail_count)
continue
# The loop gets here if the API is able to respond
# Check if the strategy no longer exists. This should not happen.
if subcloud_strategy is None:
raise Exception("Firmware strategy disappeared while applying")
elif subcloud_strategy.state == vim.STATE_APPLYING:
# Still applying. Update details if it has changed
new_details = ("%s phase is %s%% complete" % (
subcloud_strategy.current_phase,
subcloud_strategy.current_phase_completion_percentage))
if new_details != last_details:
# Progress is being made.
# Reset the counter and log the progress
last_details = new_details
wait_count = 0
self.info_log(strategy_step, new_details)
db_api.strategy_step_update(self.context,
strategy_step.subcloud_id,
details=new_details)
elif subcloud_strategy.state == vim.STATE_APPLIED:
# Success. Break out of loop
self.info_log(strategy_step,
"Firmware strategy has been applied")
break
elif subcloud_strategy.state in [vim.STATE_APPLY_FAILED,
vim.STATE_APPLY_TIMEOUT]:
# Explicit known failure states
raise Exception("Firmware strategy apply failed. %s. %s"
% (subcloud_strategy.state,
subcloud_strategy.apply_phase.reason))
else:
# Other states are bad
raise Exception("Firmware strategy apply failed. "
"Unexpected State: %s."
% subcloud_strategy.state)
# end of loop
# Success, state machine can proceed to the next state
return self.next_state

View File

@@ -0,0 +1,132 @@
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import time
from dccommon.drivers.openstack import vim
from dcmanager.common import consts
from dcmanager.common.exceptions import StrategyStoppedException
from dcmanager.common import utils as dcmanager_utils
from dcmanager.orchestrator.states.base import BaseState
# Max time: 30 minutes = 180 queries x 10 seconds between
DEFAULT_MAX_QUERIES = 180
DEFAULT_SLEEP_DURATION = 10
class CreatingVIMStrategyState(BaseState):
"""State for creating the VIM FPGA update strategy."""
def __init__(self, region_name):
super(CreatingVIMStrategyState, self).__init__(
next_state=consts.STRATEGY_STATE_APPLYING_FW_UPDATE_STRATEGY, region_name=region_name)
# max time to wait for the strategy to be built (in seconds)
# is: sleep_duration * max_queries
self.sleep_duration = DEFAULT_SLEEP_DURATION
self.max_queries = DEFAULT_MAX_QUERIES
def _create_vim_strategy(self, strategy_step, region):
self.info_log(strategy_step, "Creating VIM firmware strategy")
# Get the update options
opts_dict = dcmanager_utils.get_sw_update_opts(
self.context,
for_sw_update=True,
subcloud_id=strategy_step.subcloud_id)
# Call the API to build the firmware strategy
# max-parallel-workers cannot be less than 2 or greater than 5
subcloud_strategy = self.get_vim_client(region).create_strategy(
vim.STRATEGY_NAME_FW_UPDATE,
opts_dict['storage-apply-type'],
opts_dict['worker-apply-type'],
2, # opts_dict['max-parallel-workers'],
opts_dict['default-instance-action'],
opts_dict['alarm-restriction-type'])
# a successful API call to create MUST set the state be 'building'
if subcloud_strategy.state != vim.STATE_BUILDING:
raise Exception("Unexpected VIM strategy build state: %s"
% subcloud_strategy.state)
return subcloud_strategy
def perform_state_action(self, strategy_step):
"""Create a FPGA update strategy using VIM REST API
Any client (vim, sysinv, etc..) should be re-queried whenever used
to ensure the keystone token is up to date.
Any exceptions raised by this method set the strategy to FAILED
Returns the next state for the state machine if successful.
"""
region = self.get_region_name(strategy_step)
# Get the existing firmware strategy, which may be None
subcloud_strategy = self.get_vim_client(region).get_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE,
raise_error_if_missing=False)
if subcloud_strategy is None:
subcloud_strategy = self._create_vim_strategy(strategy_step,
region)
else:
self.info_log(strategy_step,
"FW VIM strategy exists with state: %s"
% subcloud_strategy.state)
# if a strategy exists in any type of failed state or aborted
# state it should be deleted.
# applied state should also be deleted from previous success runs.
if subcloud_strategy.state in [vim.STATE_BUILD_FAILED,
vim.STATE_BUILD_TIMEOUT,
vim.STATE_APPLY_FAILED,
vim.STATE_APPLY_TIMEOUT,
vim.STATE_ABORTED,
vim.STATE_ABORT_FAILED,
vim.STATE_ABORT_TIMEOUT,
vim.STATE_APPLIED]:
self.info_log(strategy_step,
"Deleting existing FW VIM strategy")
self.get_vim_client(region).delete_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE)
# re-create it
subcloud_strategy = self._create_vim_strategy(strategy_step,
region)
# A strategy already exists, or is being built
# Loop until the strategy is done building Repeatedly query the API
counter = 0
while True:
# If event handler stop has been triggered, fail the state
if self.stopped():
raise StrategyStoppedException()
if counter >= self.max_queries:
raise Exception("Timeout building vim strategy. state: %s"
% subcloud_strategy.state)
counter += 1
time.sleep(self.sleep_duration)
# query the vim strategy to see if it is in the new state
subcloud_strategy = self.get_vim_client(region).get_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE,
raise_error_if_missing=True)
if subcloud_strategy.state == vim.STATE_READY_TO_APPLY:
self.info_log(strategy_step, "VIM strategy has been built")
break
elif subcloud_strategy.state == vim.STATE_BUILDING:
# This is the expected state while creating the strategy
pass
elif subcloud_strategy.state == vim.STATE_BUILD_FAILED:
raise Exception("VIM strategy build failed: %s. %s."
% (subcloud_strategy.state,
subcloud_strategy.build_phase.reason))
elif subcloud_strategy.state == vim.STATE_BUILD_TIMEOUT:
raise Exception("VIM strategy build timed out: %s."
% subcloud_strategy.state)
else:
raise Exception("VIM strategy unexpected build state: %s"
% subcloud_strategy.state)
# Success, state machine can proceed to the next state
return self.next_state

View File

@@ -0,0 +1,114 @@
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from dccommon.drivers.openstack import vim
from dcmanager.common import consts
from dcmanager.orchestrator.states.base import BaseState
from dcmanager.orchestrator.states.firmware import utils
from dcmanager.rpc import client as dcmanager_rpc_client
from dcorch.common import consts as dcorch_consts
class FinishingFwUpdateState(BaseState):
"""State for finishing the firmware update."""
def __init__(self, region_name):
super(FinishingFwUpdateState, self).__init__(
next_state=consts.STRATEGY_STATE_COMPLETE, region_name=region_name)
def align_subcloud_status(self, strategy_step):
self.info_log(strategy_step,
"Setting endpoint status of %s to %s"
% (dcorch_consts.ENDPOINT_TYPE_FIRMWARE,
consts.SYNC_STATUS_IN_SYNC))
rpc_client = dcmanager_rpc_client.ManagerClient()
# The subcloud name is the same as the region in the strategy_step
rpc_client.update_subcloud_endpoint_status(
self.context,
subcloud_name=self.get_region_name(strategy_step),
endpoint_type=dcorch_consts.ENDPOINT_TYPE_FIRMWARE,
sync_status=consts.SYNC_STATUS_IN_SYNC)
def perform_state_action(self, strategy_step):
"""Finish the firmware update.
Any client (vim, sysinv, etc..) should be re-queried whenever used
to ensure the keystone token is up to date.
Any exceptions raised by this method set the strategy to FAILED
Returns the next state for the state machine if successful.
"""
# Possible things that need to be done in this state:
# - delete the vim fw update strategy
# - clean up files
# - report information about the firmware on the subcloud
region = self.get_region_name(strategy_step)
# Get the existing firmware strategy, which may be None
subcloud_strategy = self.get_vim_client(region).get_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE,
raise_error_if_missing=False)
if subcloud_strategy is not None:
self.info_log(strategy_step,
"Deleting FW VIM strategy that has state: %s"
% subcloud_strategy.state)
self.get_vim_client(region).delete_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE)
# FINAL CHECK
# if any of the device images are in failed state, fail this state
# only check for enabled devices matching images with applied labels
# get the list of enabled devices on the subcloud
enabled_host_device_list = []
subcloud_hosts = self.get_sysinv_client(region).get_hosts()
for host in subcloud_hosts:
host_devices = self.get_sysinv_client(
region).get_host_device_list(host.uuid)
for device in host_devices:
if device.enabled:
enabled_host_device_list.append(device)
if not enabled_host_device_list:
# There are no enabled devices in this subcloud, so break out
# of this handler, since there will be nothing examine
self.info_log(strategy_step, "No enabled devices.")
# This is the final state for this subcloud. set it to in-sync
self.align_subcloud_status(strategy_step)
return self.next_state
# determine list of applied subcloud images
subcloud_images = self.get_sysinv_client(region).get_device_images()
applied_subcloud_images = \
utils.filter_applied_images(subcloud_images,
expected_value=True)
# Retrieve the device image states on this subcloud.
subcloud_device_image_states = self.get_sysinv_client(
region).get_device_image_states()
device_map = utils.to_uuid_map(enabled_host_device_list)
image_map = utils.to_uuid_map(applied_subcloud_images)
# loop over all states to see which are not complete
# if any correspond to an enabled device, fail this handler
failed_states = []
for device_image_state_obj in subcloud_device_image_states:
if device_image_state_obj.status != utils.DEVICE_IMAGE_UPDATE_COMPLETED:
device = device_map.get(device_image_state_obj.pcidevice_uuid)
if device is not None:
image = image_map.get(device_image_state_obj.image_uuid)
if image is not None:
self.info_log(strategy_step,
"Failed apply: %s"
% device_image_state_obj)
failed_states.append(device_image_state_obj)
if failed_states:
# todo(abailey): create a custom Exception
raise Exception("Not all images applied successfully")
# This is the final state for this subcloud. set it to in-sync
self.align_subcloud_status(strategy_step)
# Success, state machine can proceed to the next state
return self.next_state

View File

@@ -0,0 +1,182 @@
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import os
from dcmanager.common import consts
from dcmanager.orchestrator.states.base import BaseState
from dcmanager.orchestrator.states.firmware import utils
class ImportingFirmwareState(BaseState):
"""State for importing firmware
Query the device-images on the system controller that are 'pending'
Ensure those device images are uploaded on the subcloud.
"""
def __init__(self, region_name):
super(ImportingFirmwareState, self).__init__(
next_state=consts.STRATEGY_STATE_CREATING_FW_UPDATE_STRATEGY, region_name=region_name)
def _image_in_list(self, image, image_list):
# todo(abailey): FUTURE. There may be other ways that two images can
# be considered identical other than a database UUID
for img in image_list:
if img.uuid == image.uuid:
return True
return False
def perform_state_action(self, strategy_step):
"""Import firmware on a subcloud
Any client (vim, sysinv, etc..) should be re-queried whenever used
to ensure the keystone token is up to date.
Any exceptions raised by this method set the strategy to FAILED
Returns the next state for the state machine if successful.
"""
# The comparisons in this method need to align with the logic in
# subcloud_firmware_audit
# ============== query system controller images ==============
system_controller_images = self.get_sysinv_client(
consts.DEFAULT_REGION_NAME).get_device_images()
# determine list of applied system controller images
applied_system_controller_images = \
utils.filter_applied_images(system_controller_images,
expected_value=True)
# ============== query subcloud images ========================
region = self.get_region_name(strategy_step)
subcloud_images = self.get_sysinv_client(
region).get_device_images()
# determine list of applied subcloud images
applied_subcloud_images = \
utils.filter_applied_images(subcloud_images,
expected_value=True)
subcloud_device_label_list = self.get_sysinv_client(
region).get_device_label_list()
subcloud_labels = []
for device_label in subcloud_device_label_list:
subcloud_labels.append({device_label.label_key:
device_label.label_value})
# - remove any applied images in subcloud that are not applied on the
# system controller
for image in applied_subcloud_images:
if not self._image_in_list(image,
applied_system_controller_images):
# the applied image in the subcloud is not in the system
# controller applied list, and should be removed
# Use the existing labels on the image for the remove
labels = []
for label in image.applied_labels:
# Do not append an empty dictionary
if label:
labels.append(label)
self.info_log(strategy_step,
"Remove Image %s by labels: %s" % (image.uuid,
str(labels)))
self.get_sysinv_client(region).remove_device_image(
image.uuid,
labels)
# get the list of enabled devices on the subcloud
enabled_host_device_list = []
subcloud_hosts = self.get_sysinv_client(region).get_hosts()
for host in subcloud_hosts:
host_devices = self.get_sysinv_client(
region).get_host_device_list(host.uuid)
for device in host_devices:
if device.enabled:
enabled_host_device_list.append(device)
if not enabled_host_device_list:
# There are no enabled devices in this subcloud, so break out
# of this handler, since there will be nothing to upload or apply
self.info_log(strategy_step,
"No enabled devices. Skipping upload and apply.")
return self.next_state
# Retrieve the device image states on this subcloud.
subcloud_device_image_states = self.get_sysinv_client(
region).get_device_image_states()
# go through the applied images on system controller
# any of the images that correspond to an enabled device on the
# subcloud should be uploaded and applied if it does not exist
for image in applied_system_controller_images:
device = utils.check_subcloud_device_has_image(
image,
enabled_host_device_list,
subcloud_device_label_list)
if device is not None:
# there was a matching device for that image
# We need to upload it if it does not exist yet
if not self._image_in_list(image, subcloud_images):
self.info_log(strategy_step,
"Uploading image:%s " % image.uuid)
bitstreamfile = utils.determine_image_file(image)
if not os.path.isfile(bitstreamfile):
# We could not find the file in the vault
raise Exception("File does not exist: %s"
% bitstreamfile)
fields = utils.determine_image_fields(image)
new_image_response = self.get_sysinv_client(
region).upload_device_image(bitstreamfile, fields)
self.debug_log(strategy_step,
"Upload device image returned: %s"
% str(new_image_response))
self.info_log(strategy_step,
"Uploaded image:%s " % image.uuid)
# The image exists on the subcloud
# However, it may not have been applied to this device
device_image_state = None
for device_image_state_obj in subcloud_device_image_states:
if device_image_state_obj.pcidevice_uuid == device.uuid\
and device_image_state_obj.image_uuid == image.uuid:
device_image_state = device_image_state_obj
break
else:
# If no device image state is present in the list that
# means the image hasn't been applied yet
# apply with ALL the labels declared for this image on
# system controller
labels = []
for label in image.applied_labels:
# Do not append an empty dictionary
if label:
labels.append(label)
self.info_log(strategy_step,
"Applying device image:%s with labels:%s"
% (image.uuid, str(labels)))
apply_response = self.get_sysinv_client(
region).apply_device_image(image.uuid, labels=labels)
self.debug_log(strategy_step,
"Apply device image returned: %s"
% str(apply_response))
self.info_log(strategy_step,
"Applied image:%s with labels:%s"
% (image.uuid, str(labels)))
continue
# We have a device_image_state. Lets examine the apply status
if device_image_state.status != utils.DEVICE_IMAGE_UPDATE_COMPLETED:
self.info_log(strategy_step,
"Image:%s has not been written. State:%s"
% (image.uuid, device_image_state.status))
else:
self.info_log(strategy_step,
"Skipping already applied image:%s "
% image.uuid)
# If none of those API calls failed, this state was successful
# Success, state machine can proceed to the next state
return self.next_state

View File

@@ -0,0 +1,133 @@
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import os
# Device Image Status - duplicated from sysinv/common/device.py
DEVICE_IMAGE_UPDATE_PENDING = 'pending'
DEVICE_IMAGE_UPDATE_IN_PROGRESS = 'in-progress'
DEVICE_IMAGE_UPDATE_IN_PROGRESS_ABORTED = 'in-progress-aborted'
DEVICE_IMAGE_UPDATE_COMPLETED = 'completed'
DEVICE_IMAGE_UPDATE_FAILED = 'failed'
DEVICE_IMAGE_UPDATE_NULL = ''
# convert a list of objects that have a uuid field, into a map keyed on uuid
def to_uuid_map(list_with_uuids):
uuid_map = {}
for uuid_obj in list_with_uuids:
uuid_map[uuid_obj.uuid] = uuid_obj
return uuid_map
# todo(abailey) refactor based on firmware_audit code for
# _check_subcloud_device_has_image
# THIS METHOD should be renamed !!
def check_subcloud_device_has_image(image,
enabled_host_device_list,
subcloud_device_label_list):
"""Return device on subcloud that matches the image, or None"""
apply_to_all_devices = False
if image.applied_labels:
# Returns true if the list contains at least one empty dict.
# An empty dict signifies that image is to be applied to
# all devices that match the pci vendor and pci device ID.
apply_to_all_devices = any(not image for image in image.applied_labels)
for device in enabled_host_device_list:
if not apply_to_all_devices:
# If image has to be applied to devices with a matching label
# and the device label list is empty on the subcloud, there
# cannot be a match. break out of the loop and return None
if not subcloud_device_label_list:
break
# Device is considered eligible if device labels
# match at least one of the image labels
is_device_eligible = False
for image_label in image.applied_labels:
label_key = list(image_label.keys())[0]
label_value = image_label.get(label_key)
is_device_eligible = check_for_label_match(
subcloud_device_label_list,
device.uuid,
label_key,
label_value)
# If device label matches any image label stop checking
# for any other label matches and do pci comparison below
if is_device_eligible:
break
# If this device is not eligible, go to the next device
if not is_device_eligible:
continue
# We found an eligible device
if image.pci_vendor == device.pvendor_id and \
image.pci_device == device.pdevice_id:
return device
# no matching devices
return None
# todo(abailey): refactor with https://review.opendev.org/#/c/741515
def get_device_image_filename(resource):
filename = "{}-{}-{}-{}.bit".format(resource.bitstream_type,
resource.pci_vendor,
resource.pci_device,
resource.uuid)
return filename
# todo(abailey): use constant from https://review.opendev.org/#/c/741515
def determine_image_file(image):
"""Find the bitstream file for an image in the vault"""
DEVICE_IMAGE_VAULT_DIR = '/opt/dc-vault/device_images'
return os.path.join(DEVICE_IMAGE_VAULT_DIR,
get_device_image_filename(image))
def determine_image_fields(image):
"""Return the appropriate upload fields for an image"""
field_list = ['uuid',
'bitstream_type',
'pci_vendor',
'pci_device',
'bitstream_id',
'key_signature',
'revoke_key_id',
'name',
'description',
'image_version']
fields = dict((k, str(v)) for (k, v) in vars(image).items()
if k in field_list and not (v is None))
return fields
def check_for_label_match(subcloud_host_device_label_list,
device_uuid,
label_key,
label_value):
# todo(abailey): should this compare pci_device_uuid or vendor/device
for device_label in subcloud_host_device_label_list:
if device_label.pcidevice_uuid and \
device_uuid == device_label.pcidevice_uuid and \
label_key == device_label.label_key and \
label_value == device_label.label_value:
return True
return False
def filter_applied_images(device_images, expected_value=True):
"""Filter a list of DeviceImage objects by the 'applied' field
Returns list of images that have 'applied' field matching expected_value
"""
filtered_images = []
for device_image in device_images:
if device_image.applied == expected_value:
filtered_images.append(device_image)
return filtered_images