OpenStack Compute (Nova)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

10876 lines
517 KiB

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Handles all processes relating to instances (guest vms).
The :py:class:`ComputeManager` class is a :py:class:`nova.manager.Manager` that
handles RPC calls relating to creating instances. It is responsible for
building a disk image, launching it via the underlying virtualization driver,
responding to calls to check its state, attaching persistent storage, and
terminating it.
"""
import base64
import binascii
import contextlib
import copy
import functools
import inspect
import sys
import time
import traceback
import typing as ty
from cinderclient import exceptions as cinder_exception
from cursive import exception as cursive_exception
import eventlet.event
from eventlet import greenthread
import eventlet.semaphore
import eventlet.timeout
import futurist
from keystoneauth1 import exceptions as keystone_exception
import os_traits
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from oslo_service import loopingcall
from oslo_service import periodic_task
from oslo_utils import excutils
from oslo_utils import strutils
from oslo_utils import timeutils
from oslo_utils import units
from nova.accelerator import cyborg
from nova import block_device
from nova.compute import api as compute
from nova.compute import build_results
from nova.compute import claims
from nova.compute import power_state
from nova.compute import resource_tracker
from nova.compute import rpcapi as compute_rpcapi
from nova.compute import task_states
from nova.compute import utils as compute_utils
from nova.compute.utils import wrap_instance_event
from nova.compute import vm_states
from nova import conductor
import nova.conf
import nova.context
from nova import exception
from nova import exception_wrapper
from nova.i18n import _
from nova.image import glance
from nova import manager
from nova.network import model as network_model
from nova.network import neutron
from nova import objects
from nova.objects import base as obj_base
from nova.objects import external_event as external_event_obj
from nova.objects import fields
from nova.objects import instance as obj_instance
from nova.objects import migrate_data as migrate_data_obj
from nova.pci import request as pci_req_module
from nova.pci import whitelist
from nova import safe_utils
from nova.scheduler.client import query
from nova.scheduler.client import report
from nova.scheduler import utils as scheduler_utils
from nova import utils
from nova.virt import block_device as driver_block_device
from nova.virt import configdrive
from nova.virt import driver
from nova.virt import event as virtevent
from nova.virt import hardware
from nova.virt import storage_users
from nova.virt import virtapi
from nova.volume import cinder
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
wrap_exception = functools.partial(
exception_wrapper.wrap_exception, service='compute', binary='nova-compute')
@contextlib.contextmanager
def errors_out_migration_ctxt(migration):
"""Context manager to error out migration on failure."""
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
if migration:
# We may have been passed None for our migration if we're
# receiving from an older client. The migration will be
# errored via the legacy path.
migration.status = 'error'
try:
migration.save()
except Exception:
LOG.debug(
'Error setting migration status for instance %s.',
migration.instance_uuid, exc_info=True)
@utils.expects_func_args('migration')
def errors_out_migration(function):
"""Decorator to error out migration on failure."""
@functools.wraps(function)
def decorated_function(self, context, *args, **kwargs):
wrapped_func = safe_utils.get_wrapped_function(function)
keyed_args = inspect.getcallargs(wrapped_func, self, context,
*args, **kwargs)
migration = keyed_args['migration']
with errors_out_migration_ctxt(migration):
return function(self, context, *args, **kwargs)
return decorated_function
@utils.expects_func_args('instance')
def reverts_task_state(function):
"""Decorator to revert task_state on failure."""
@functools.wraps(function)
def decorated_function(self, context, *args, **kwargs):
try:
return function(self, context, *args, **kwargs)
except exception.UnexpectedTaskStateError as e:
# Note(maoy): unexpected task state means the current
# task is preempted. Do not clear task state in this
# case.
with excutils.save_and_reraise_exception():
LOG.info("Task possibly preempted: %s",
e.format_message())
except Exception:
with excutils.save_and_reraise_exception():
wrapped_func = safe_utils.get_wrapped_function(function)
keyed_args = inspect.getcallargs(wrapped_func, self, context,
*args, **kwargs)
# NOTE(mriedem): 'instance' must be in keyed_args because we
# have utils.expects_func_args('instance') decorating this
# method.
instance = keyed_args['instance']
original_task_state = instance.task_state
try:
self._instance_update(context, instance, task_state=None)
LOG.info("Successfully reverted task state from %s on "
"failure for instance.",
original_task_state, instance=instance)
except exception.InstanceNotFound:
# We might delete an instance that failed to build shortly
# after it errored out this is an expected case and we
# should not trace on it.
pass
except Exception as e:
LOG.warning("Failed to revert task state for instance. "
"Error: %s", e, instance=instance)
return decorated_function
@utils.expects_func_args('instance')
def wrap_instance_fault(function):
"""Wraps a method to catch exceptions related to instances.
This decorator wraps a method to catch any exceptions having to do with
an instance that may get thrown. It then logs an instance fault in the db.
"""
@functools.wraps(function)
def decorated_function(self, context, *args, **kwargs):
try:
return function(self, context, *args, **kwargs)
except exception.InstanceNotFound:
raise
except Exception as e:
# NOTE(gtt): If argument 'instance' is in args rather than kwargs,
# we will get a KeyError exception which will cover up the real
# exception. So, we update kwargs with the values from args first.
# then, we can get 'instance' from kwargs easily.
kwargs.update(dict(zip(function.__code__.co_varnames[2:], args)))
with excutils.save_and_reraise_exception():
compute_utils.add_instance_fault_from_exc(context,
kwargs['instance'], e, sys.exc_info())
return decorated_function
@utils.expects_func_args('image_id', 'instance')
def delete_image_on_error(function):
"""Used for snapshot related method to ensure the image created in
compute.api is deleted when an error occurs.
"""
@functools.wraps(function)
def decorated_function(self, context, image_id, instance,
*args, **kwargs):
try:
return function(self, context, image_id, instance,
*args, **kwargs)
except Exception:
with excutils.save_and_reraise_exception():
compute_utils.delete_image(
context, instance, self.image_api, image_id,
log_exc_info=True)
return decorated_function
# Each collection of events is a dict of eventlet Events keyed by a tuple of
# event name and associated tag
_InstanceEvents = ty.Dict[ty.Tuple[str, str], eventlet.event.Event]
class InstanceEvents(object):
def __init__(self):
self._events: ty.Optional[ty.Dict[str, _InstanceEvents]] = {}
@staticmethod
def _lock_name(instance) -> str:
return '%s-%s' % (instance.uuid, 'events')
def prepare_for_instance_event(
self,
instance: 'objects.Instance',
name: str,
tag: str,
) -> eventlet.event.Event:
"""Prepare to receive an event for an instance.
This will register an event for the given instance that we will
wait on later. This should be called before initiating whatever
action will trigger the event. The resulting eventlet.event.Event
object should be wait()'d on to ensure completion.
:param instance: the instance for which the event will be generated
:param name: the name of the event we're expecting
:param tag: the tag associated with the event we're expecting
:returns: an event object that should be wait()'d on
"""
@utils.synchronized(self._lock_name(instance))
def _create_or_get_event():
if self._events is None:
# NOTE(danms): We really should have a more specific error
# here, but this is what we use for our default error case
raise exception.NovaException(
'In shutdown, no new events can be scheduled')
instance_events = self._events.setdefault(instance.uuid, {})
return instance_events.setdefault((name, tag),
eventlet.event.Event())
LOG.debug('Preparing to wait for external event %(name)s-%(tag)s',
{'name': name, 'tag': tag}, instance=instance)
return _create_or_get_event()
def pop_instance_event(self, instance, event):
"""Remove a pending event from the wait list.
This will remove a pending event from the wait list so that it
can be used to signal the waiters to wake up.
:param instance: the instance for which the event was generated
:param event: the nova.objects.external_event.InstanceExternalEvent
that describes the event
:returns: the eventlet.event.Event object on which the waiters
are blocked
"""
no_events_sentinel = object()
no_matching_event_sentinel = object()
@utils.synchronized(self._lock_name(instance))
def _pop_event():
if self._events is None:
LOG.debug('Unexpected attempt to pop events during shutdown',
instance=instance)
return no_events_sentinel
events = self._events.get(instance.uuid)
if not events:
return no_events_sentinel
_event = events.pop((event.name, event.tag), None)
if not events:
del self._events[instance.uuid]
if _event is None:
return no_matching_event_sentinel
return _event
result = _pop_event()
if result is no_events_sentinel:
LOG.debug('No waiting events found dispatching %(event)s',
{'event': event.key},
instance=instance)
return None
elif result is no_matching_event_sentinel:
LOG.debug(
'No event matching %(event)s in %(events)s',
{
'event': event.key,
# mypy can't identify the none check in _pop_event
'events': self._events.get( # type: ignore
instance.uuid, {}).keys(),
},
instance=instance,
)
return None
else:
return result
def clear_events_for_instance(self, instance):
"""Remove all pending events for an instance.
This will remove all events currently pending for an instance
and return them (indexed by event name).
:param instance: the instance for which events should be purged
:returns: a dictionary of {event_name: eventlet.event.Event}
"""
@utils.synchronized(self._lock_name(instance))
def _clear_events():
if self._events is None:
LOG.debug('Unexpected attempt to clear events during shutdown',
instance=instance)
return dict()
# NOTE(danms): We have historically returned the raw internal
# format here, which is {event.key: [events, ...])} so just
# trivially convert it here.
return {'%s-%s' % k: e
for k, e in self._events.pop(instance.uuid, {}).items()}
return _clear_events()
def cancel_all_events(self):
if self._events is None:
LOG.debug('Unexpected attempt to cancel events during shutdown.')
return
our_events = self._events
# NOTE(danms): Block new events
self._events = None
for instance_uuid, events in our_events.items():
for (name, tag), eventlet_event in events.items():
LOG.debug('Canceling in-flight event %(name)s-%(tag)s for '
'instance %(instance_uuid)s',
{'name': name,
'tag': tag,
'instance_uuid': instance_uuid})
event = objects.InstanceExternalEvent(
instance_uuid=instance_uuid,
name=name, status='failed',
tag=tag, data={})
eventlet_event.send(event)
class ComputeVirtAPI(virtapi.VirtAPI):
def __init__(self, compute):
super(ComputeVirtAPI, self).__init__()
self._compute = compute
self.reportclient = compute.reportclient
class ExitEarly(Exception):
def __init__(self, events):
super(Exception, self).__init__()
self.events = events
self._exit_early_exc = ExitEarly
def exit_wait_early(self, events):
"""Exit a wait_for_instance_event() immediately and avoid
waiting for some events.
:param: events: A list of (name, tag) tuples for events that we should
skip waiting for during a wait_for_instance_event().
"""
raise self._exit_early_exc(events=events)
def _default_error_callback(self, event_name, instance):
raise exception.NovaException(_('Instance event failed'))
@contextlib.contextmanager
def wait_for_instance_event(self, instance, event_names, deadline=300,
error_callback=None):
"""Plan to wait for some events, run some code, then wait.
This context manager will first create plans to wait for the
provided event_names, yield, and then wait for all the scheduled
events to complete.
Note that this uses an eventlet.timeout.Timeout to bound the
operation, so callers should be prepared to catch that
failure and handle that situation appropriately.
If the event is not received by the specified timeout deadline,
eventlet.timeout.Timeout is raised.
If the event is received but did not have a 'completed'
status, a NovaException is raised. If an error_callback is
provided, instead of raising an exception as detailed above
for the failure case, the callback will be called with the
event_name and instance, and can return True to continue
waiting for the rest of the events, False to stop processing,
or raise an exception which will bubble up to the waiter.
If the inner code wishes to abort waiting for one or more
events because it knows some state to be finished or condition
to be satisfied, it can use VirtAPI.exit_wait_early() with a
list of event (name,tag) items to avoid waiting for those
events upon context exit. Note that exit_wait_early() exits
the context immediately and should be used to signal that all
work has been completed and provide the unified list of events
that need not be waited for. Waiting for the remaining events
will begin immediately upon early exit as if the context was
exited normally.
:param instance: The instance for which an event is expected
:param event_names: A list of event names. Each element is a
tuple of strings to indicate (name, tag),
where name is required, but tag may be None.
:param deadline: Maximum number of seconds we should wait for all
of the specified events to arrive.
:param error_callback: A function to be called if an event arrives
"""
if error_callback is None:
error_callback = self._default_error_callback
events = {}
for event_name in event_names:
name, tag = event_name
event_name = objects.InstanceExternalEvent.make_key(name, tag)
try:
events[event_name] = (
self._compute.instance_events.prepare_for_instance_event(
instance, name, tag))
except exception.NovaException:
error_callback(event_name, instance)
# NOTE(danms): Don't wait for any of the events. They
# should all be canceled and fired immediately below,
# but don't stick around if not.
deadline = 0
try:
yield
except self._exit_early_exc as e:
early_events = set([objects.InstanceExternalEvent.make_key(n, t)
for n, t in e.events])
else:
early_events = set([])
with eventlet.timeout.Timeout(deadline):
for event_name, event in events.items():
if event_name in early_events:
continue
else:
actual_event = event.wait()
if actual_event.status == 'completed':
continue
# If we get here, we have an event that was not completed,
# nor skipped via exit_wait_early(). Decide whether to
# keep waiting by calling the error_callback() hook.
decision = error_callback(event_name, instance)
if decision is False:
break
def update_compute_provider_status(self, context, rp_uuid, enabled):
"""Used to add/remove the COMPUTE_STATUS_DISABLED trait on the provider
:param context: nova auth RequestContext
:param rp_uuid: UUID of a compute node resource provider in Placement
:param enabled: True if the node is enabled in which case the trait
would be removed, False if the node is disabled in which case
the trait would be added.
:raises: ResourceProviderTraitRetrievalFailed
:raises: ResourceProviderUpdateConflict
:raises: ResourceProviderUpdateFailed
:raises: TraitRetrievalFailed
:raises: keystoneauth1.exceptions.ClientException
"""
trait_name = os_traits.COMPUTE_STATUS_DISABLED
# Get the current traits (and generation) for the provider.
# TODO(mriedem): Leverage the ProviderTree cache in get_provider_traits
trait_info = self.reportclient.get_provider_traits(context, rp_uuid)
# If the host is enabled, remove the trait (if set), else add
# the trait if it doesn't already exist.
original_traits = trait_info.traits
new_traits = None
if enabled and trait_name in original_traits:
new_traits = original_traits - {trait_name}
LOG.debug('Removing trait %s from compute node resource '
'provider %s in placement.', trait_name, rp_uuid)
elif not enabled and trait_name not in original_traits:
new_traits = original_traits | {trait_name}
LOG.debug('Adding trait %s to compute node resource '
'provider %s in placement.', trait_name, rp_uuid)
if new_traits is not None:
self.reportclient.set_traits_for_provider(
context, rp_uuid, new_traits, generation=trait_info.generation)
class ComputeManager(manager.Manager):
"""Manages the running instances from creation to destruction."""
target = messaging.Target(version='6.0')
def __init__(self, compute_driver=None, *args, **kwargs):
"""Load configuration options and connect to the hypervisor."""
# We want the ComputeManager, ResourceTracker and ComputeVirtAPI all
# using the same instance of SchedulerReportClient which has the
# ProviderTree cache for this compute service.
self.reportclient = report.SchedulerReportClient()
self.virtapi = ComputeVirtAPI(self)
self.network_api = neutron.API()
self.volume_api = cinder.API()
self.image_api = glance.API()
self._last_bw_usage_poll = 0.0
self.compute_api = compute.API()
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
self.compute_task_api = conductor.ComputeTaskAPI()
self.query_client = query.SchedulerQueryClient()
self.instance_events = InstanceEvents()
self._sync_power_pool = eventlet.GreenPool(
size=CONF.sync_power_state_pool_size)
self._syncs_in_progress = {}
self.send_instance_updates = (
CONF.filter_scheduler.track_instance_changes)
if CONF.max_concurrent_builds != 0:
self._build_semaphore = eventlet.semaphore.Semaphore(
CONF.max_concurrent_builds)
else:
self._build_semaphore = compute_utils.UnlimitedSemaphore()
if CONF.max_concurrent_snapshots > 0:
self._snapshot_semaphore = eventlet.semaphore.Semaphore(
CONF.max_concurrent_snapshots)
else:
self._snapshot_semaphore = compute_utils.UnlimitedSemaphore()
if CONF.max_concurrent_live_migrations > 0:
self._live_migration_executor = futurist.GreenThreadPoolExecutor(
max_workers=CONF.max_concurrent_live_migrations)
else:
# CONF.max_concurrent_live_migrations is 0 (unlimited)
self._live_migration_executor = futurist.GreenThreadPoolExecutor()
# This is a dict, keyed by instance uuid, to a two-item tuple of
# migration object and Future for the queued live migration.
self._waiting_live_migrations = {}
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
# TODO(sbauza): Remove this call once we delete the V5Proxy class
self.additional_endpoints.append(_ComputeV5Proxy(self))
# NOTE(russellb) Load the driver last. It may call back into the
# compute manager via the virtapi, so we want it to be fully
# initialized before that happens.
self.driver = driver.load_compute_driver(self.virtapi, compute_driver)
self.use_legacy_block_device_info = \
self.driver.need_legacy_block_device_info
self.rt = resource_tracker.ResourceTracker(
self.host, self.driver, reportclient=self.reportclient)
def reset(self):
LOG.info('Reloading compute RPC API')
compute_rpcapi.reset_globals()
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
self.reportclient.clear_provider_cache()
def _update_resource_tracker(self, context, instance):
"""Let the resource tracker know that an instance has changed state."""
if instance.host == self.host:
self.rt.update_usage(context, instance, instance.node)
def _instance_update(self, context, instance, **kwargs):
"""Update an instance in the database using kwargs as value."""
for k, v in kwargs.items():
setattr(instance, k, v)
instance.save()
self._update_resource_tracker(context, instance)
def _nil_out_instance_obj_host_and_node(self, instance):
# NOTE(jwcroppe): We don't do instance.save() here for performance
# reasons; a call to this is expected to be immediately followed by
# another call that does instance.save(), thus avoiding two writes
# to the database layer.
instance.host = None
instance.node = None
# ResourceTracker._set_instance_host_and_node also sets launched_on
# to the same value as host and is really only ever used by legacy
# nova-network code, but we should also null it out to avoid confusion
# if there is an instance in the database with no host set but
# launched_on is set. Note that we do not care about using launched_on
# as some kind of debug helper if diagnosing a build failure, that is
# what instance action events are for.
instance.launched_on = None
# If the instance is not on a host, it's not in an aggregate and
# therefore is not in an availability zone.
instance.availability_zone = None
def _set_instance_obj_error_state(self, instance, clean_task_state=False):
try:
instance.vm_state = vm_states.ERROR
if clean_task_state:
instance.task_state = None
instance.save()
except exception.InstanceNotFound:
LOG.debug('Instance has been destroyed from under us while '
'trying to set it to ERROR', instance=instance)
def _get_instances_on_driver(self, context, filters=None):
"""Return a list of instance records for the instances found
on the hypervisor which satisfy the specified filters. If filters=None
return a list of instance records for all the instances found on the
hypervisor.
"""
if not filters:
filters = {}
try:
driver_uuids = self.driver.list_instance_uuids()
if len(driver_uuids) == 0:
# Short circuit, don't waste a DB call
return objects.InstanceList()
filters['uuid'] = driver_uuids
local_instances = objects.InstanceList.get_by_filters(
context, filters, use_slave=True)
return local_instances
except NotImplementedError:
pass
# The driver doesn't support uuids listing, so we'll have
# to brute force.
driver_instances = self.driver.list_instances()
# NOTE(mjozefcz): In this case we need to apply host filter.
# Without this all instance data would be fetched from db.
filters['host'] = self.host
instances = objects.InstanceList.get_by_filters(context, filters,
use_slave=True)
name_map = {instance.name: instance for instance in instances}
local_instances = []
for driver_instance in driver_instances:
instance = name_map.get(driver_instance)
if not instance:
continue
local_instances.append(instance)
return local_instances
def _destroy_evacuated_instances(self, context, node_cache):
"""Destroys evacuated instances.
While nova-compute was down, the instances running on it could be
evacuated to another host. This method looks for evacuation migration
records where this is the source host and which were either started
(accepted), in-progress (pre-migrating) or migrated (done). From those
migration records, local instances reported by the hypervisor are
compared to the instances for the migration records and those local
guests are destroyed, along with instance allocation records in
Placement for this node.
Then allocations are removed from Placement for every instance that is
evacuated from this host regardless if the instance is reported by the
hypervisor or not.
:param context: The request context
:param node_cache: A dict of ComputeNode objects keyed by the UUID of
the compute node
:return: A dict keyed by instance uuid mapped to Migration objects
for instances that were migrated away from this host
"""
filters = {
'source_compute': self.host,
# NOTE(mriedem): Migration records that have been accepted are
# included in case the source node comes back up while instances
# are being evacuated to another host. We don't want the same
# instance being reported from multiple hosts.
# NOTE(lyarwood): pre-migrating is also included here as the
# source compute can come back online shortly after the RT
# claims on the destination that in-turn moves the migration to
# pre-migrating. If the evacuate fails on the destination host,
# the user can rebuild the instance (in ERROR state) on the source
# host.
'status': ['accepted', 'pre-migrating', 'done'],
'migration_type': fields.MigrationType.EVACUATION,
}
with utils.temporary_mutation(context, read_deleted='yes'):
evacuations = objects.MigrationList.get_by_filters(context,
filters)
if not evacuations:
return {}
evacuations = {mig.instance_uuid: mig for mig in evacuations}
# TODO(mriedem): We could optimize by pre-loading the joined fields
# we know we'll use, like info_cache and flavor.
local_instances = self._get_instances_on_driver(context)
evacuated_local_instances = {inst.uuid: inst
for inst in local_instances
if inst.uuid in evacuations}
for instance in evacuated_local_instances.values():
LOG.info('Destroying instance as it has been evacuated from '
'this host but still exists in the hypervisor',
instance=instance)
try:
network_info = self.network_api.get_instance_nw_info(
context, instance)
bdi = self._get_instance_block_device_info(context,
instance)
destroy_disks = not (self._is_instance_storage_shared(
context, instance))
except exception.InstanceNotFound:
network_info = network_model.NetworkInfo()
bdi = {}
LOG.info('Instance has been marked deleted already, '
'removing it from the hypervisor.',
instance=instance)
# always destroy disks if the instance was deleted
destroy_disks = True
self.driver.destroy(context, instance,
network_info,
bdi, destroy_disks)
hostname_to_cn_uuid = {
cn.hypervisor_hostname: cn.uuid
for cn in node_cache.values()}
for instance_uuid, migration in evacuations.items():
try:
if instance_uuid in evacuated_local_instances:
# Avoid the db call if we already have the instance loaded
# above
instance = evacuated_local_instances[instance_uuid]
else:
instance = objects.Instance.get_by_uuid(
context, instance_uuid)
except exception.InstanceNotFound:
# The instance already deleted so we expect that every
# allocation of that instance has already been cleaned up
continue
LOG.info('Cleaning up allocations of the instance as it has been '
'evacuated from this host',
instance=instance)
if migration.source_node not in hostname_to_cn_uuid:
LOG.error("Failed to clean allocation of evacuated "
"instance as the source node %s is not found",
migration.source_node, instance=instance)
continue
cn_uuid = hostname_to_cn_uuid[migration.source_node]
# If the instance was deleted in the interim, assume its
# allocations were properly cleaned up (either by its hosting
# compute service or the API).
if (not instance.deleted and
not self.reportclient.
remove_provider_tree_from_instance_allocation(
context, instance.uuid, cn_uuid)):
LOG.error("Failed to clean allocation of evacuated instance "
"on the source node %s",
cn_uuid, instance=instance)
migration.status = 'completed'
migration.save()
return evacuations
def _is_instance_storage_shared(self, context, instance, host=None):
shared_storage = True
data = None
try:
data = self.driver.check_instance_shared_storage_local(context,
instance)
if data:
shared_storage = (self.compute_rpcapi.
check_instance_shared_storage(context,
data, instance=instance, host=host))
except NotImplementedError:
LOG.debug('Hypervisor driver does not support '
'instance shared storage check, '
'assuming it\'s not on shared storage',
instance=instance)
shared_storage = False
except Exception:
LOG.exception('Failed to check if instance shared',
instance=instance)
finally:
if data:
self.driver.check_instance_shared_storage_cleanup(context,
data)
return shared_storage
def _complete_partial_deletion(self, context, instance):
"""Complete deletion for instances in DELETED status but not marked as
deleted in the DB
"""
instance.destroy()
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
context, instance.uuid)
self._complete_deletion(context,
instance)
self._notify_about_instance_usage(context, instance, "delete.end")
compute_utils.notify_about_instance_action(context, instance,
self.host, action=fields.NotificationAction.DELETE,
phase=fields.NotificationPhase.END, bdms=bdms)
def _complete_deletion(self, context, instance):
self._update_resource_tracker(context, instance)
self.reportclient.delete_allocation_for_instance(context,
instance.uuid)
self._clean_instance_console_tokens(context, instance)
self._delete_scheduler_instance_info(context, instance.uuid)
def _validate_pinning_configuration(self, instances):
if not self.driver.capabilities.get('supports_pcpus', False):
return
for instance in instances:
# ignore deleted instances
if instance.deleted:
continue
# if this is an unpinned instance and the host only has
# 'cpu_dedicated_set' configured, we need to tell the operator to
# correct their configuration
if not instance.numa_topology or (
instance.numa_topology.cpu_policy in (
None, fields.CPUAllocationPolicy.SHARED
)
):
# we don't need to check 'vcpu_pin_set' since it can't coexist
# alongside 'cpu_dedicated_set'
if (CONF.compute.cpu_dedicated_set and
not CONF.compute.cpu_shared_set):
msg = _("This host has unpinned instances but has no CPUs "
"set aside for this purpose; configure '[compute] "
"cpu_shared_set' instead of, or in addition to, "
"'[compute] cpu_dedicated_set'")
raise exception.InvalidConfiguration(msg)
continue
# ditto for pinned instances if only 'cpu_shared_set' is configured
if (CONF.compute.cpu_shared_set and
not CONF.compute.cpu_dedicated_set and
not CONF.vcpu_pin_set):
msg = _("This host has pinned instances but has no CPUs "
"set aside for this purpose; configure '[compute] "
"cpu_dedicated_set' instead of, or in addition to, "
"'[compute] cpu_shared_set'.")
raise exception.InvalidConfiguration(msg)
# if this is a mixed instance with both pinned and unpinned CPUs,
# the host must have both 'cpu_dedicated_set' and 'cpu_shared_set'
# configured. check if 'cpu_shared_set' is set.
if (instance.numa_topology.cpu_policy ==
fields.CPUAllocationPolicy.MIXED and
not CONF.compute.cpu_shared_set):
msg = _("This host has mixed instance requesting both pinned "
"and unpinned CPUs but hasn't set aside unpinned CPUs "
"for this purpose; Configure "
"'[compute] cpu_shared_set'.")
raise exception.InvalidConfiguration(msg)
# for mixed instance check if 'cpu_dedicated_set' is set.
if (instance.numa_topology.cpu_policy ==
fields.CPUAllocationPolicy.MIXED and
not CONF.compute.cpu_dedicated_set):
msg = _("This host has mixed instance requesting both pinned "
"and unpinned CPUs but hasn't set aside pinned CPUs "
"for this purpose; Configure "
"'[compute] cpu_dedicated_set'")
raise exception.InvalidConfiguration(msg)
# also check to make sure the operator hasn't accidentally
# dropped some cores that instances are currently using
available_dedicated_cpus = (hardware.get_vcpu_pin_set() or
hardware.get_cpu_dedicated_set())
pinned_cpus = instance.numa_topology.cpu_pinning
if available_dedicated_cpus and (
pinned_cpus - available_dedicated_cpus):
# we can't raise an exception because of bug #1289064,
# which meant we didn't recalculate CPU pinning information
# when we live migrated a pinned instance
LOG.warning(
"Instance is pinned to host CPUs %(cpus)s "
"but one or more of these CPUs are not included in "
"either '[compute] cpu_dedicated_set' or "
"'vcpu_pin_set'; you should update these "
"configuration options to include the missing CPUs "
"or rebuild or cold migrate this instance.",
{'cpus': list(pinned_cpus)},
instance=instance)
def _validate_vtpm_configuration(self, instances):
if self.driver.capabilities.get('supports_vtpm', False):
return
for instance in instances:
if instance.deleted:
continue
# NOTE(stephenfin): We don't have an attribute on the instance to
# check for this, so we need to inspect the flavor/image metadata
if hardware.get_vtpm_constraint(
instance.flavor, instance.image_meta,
):
msg = _(
'This host has instances with the vTPM feature enabled, '
'but the host is not correctly configured; enable '
'vTPM support.'
)
raise exception.InvalidConfiguration(msg)
def _reset_live_migration(self, context, instance):
migration = None
try:
migration = objects.Migration.get_by_instance_and_status(
context, instance.uuid, 'running')
if migration:
self.live_migration_abort(context, instance, migration.id)
except Exception:
LOG.exception('Failed to abort live-migration',
instance=instance)
finally:
if migration:
self._set_migration_status(migration, 'error')
LOG.info('Instance found in migrating state during '
'startup. Resetting task_state',
instance=instance)
instance.task_state = None
instance.save(expected_task_state=[task_states.MIGRATING])
def _init_instance(self, context, instance):
"""Initialize this instance during service init."""
# NOTE(danms): If the instance appears to not be owned by this
# host, it may have been evacuated away, but skipped by the
# evacuation cleanup code due to configuration. Thus, if that
# is a possibility, don't touch the instance in any way, but
# log the concern. This will help avoid potential issues on
# startup due to misconfiguration.
if instance.host != self.host:
LOG.warning('Instance %(uuid)s appears to not be owned '
'by this host, but by %(host)s. Startup '
'processing is being skipped.',
{'uuid': instance.uuid,
'host': instance.host})
return
# Instances that are shut down, or in an error state can not be
# initialized and are not attempted to be recovered. The exception
# to this are instances that are in RESIZE_MIGRATING or DELETING,
# which are dealt with further down.
if (instance.vm_state == vm_states.SOFT_DELETED or
(instance.vm_state == vm_states.ERROR and
instance.task_state not in
(task_states.RESIZE_MIGRATING, task_states.DELETING))):
LOG.debug("Instance is in %s state.",
instance.vm_state, instance=instance)
return
if instance.vm_state == vm_states.DELETED:
try:
self._complete_partial_deletion(context, instance)
except Exception:
# we don't want that an exception blocks the init_host
LOG.exception('Failed to complete a deletion',
instance=instance)
return
if (instance.vm_state == vm_states.BUILDING or
instance.task_state in [task_states.SCHEDULING,
task_states.BLOCK_DEVICE_MAPPING,
task_states.NETWORKING,
task_states.SPAWNING]):
# NOTE(dave-mcnally) compute stopped before instance was fully
# spawned so set to ERROR state. This is safe to do as the state
# may be set by the api but the host is not so if we get here the
# instance has already been scheduled to this particular host.
LOG.debug("Instance failed to spawn correctly, "
"setting to ERROR state", instance=instance)
self._set_instance_obj_error_state(instance, clean_task_state=True)
return
if (instance.vm_state in [vm_states.ACTIVE, vm_states.STOPPED] and
instance.task_state in [task_states.REBUILDING,
task_states.REBUILD_BLOCK_DEVICE_MAPPING,
task_states.REBUILD_SPAWNING]):
# NOTE(jichenjc) compute stopped before instance was fully
# spawned so set to ERROR state. This is consistent to BUILD
LOG.debug("Instance failed to rebuild correctly, "
"setting to ERROR state", instance=instance)
self._set_instance_obj_error_state(instance, clean_task_state=True)
return
if (instance.vm_state != vm_states.ERROR and
instance.task_state in [task_states.IMAGE_SNAPSHOT_PENDING,
task_states.IMAGE_PENDING_UPLOAD,
task_states.IMAGE_UPLOADING,
task_states.IMAGE_SNAPSHOT]):
LOG.debug("Instance in transitional state %s at start-up "
"clearing task state",
instance.task_state, instance=instance)
instance.task_state = None
instance.save()
if (instance.vm_state != vm_states.ERROR and
instance.task_state in [task_states.RESIZE_PREP]):
LOG.debug("Instance in transitional state %s at start-up "
"clearing task state",
instance['task_state'], instance=instance)
instance.task_state = None
instance.save()
if instance.task_state == task_states.DELETING:
try:
LOG.info('Service started deleting the instance during '
'the previous run, but did not finish. Restarting'
' the deletion now.', instance=instance)
instance.obj_load_attr('metadata')
instance.obj_load_attr('system_metadata')
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
context, instance.uuid)
self._delete_instance(context, instance, bdms)
except Exception:
# we don't want that an exception blocks the init_host
LOG.exception('Failed to complete a deletion',
instance=instance)
self._set_instance_obj_error_state(instance)
return
current_power_state = self._get_power_state(instance)
try_reboot, reboot_type = self._retry_reboot(
instance, current_power_state)
if try_reboot:
LOG.debug("Instance in transitional state (%(task_state)s) at "
"start-up and power state is (%(power_state)s), "
"triggering reboot",
{'task_state': instance.task_state,
'power_state': current_power_state},
instance=instance)
# NOTE(mikal): if the instance was doing a soft reboot that got as
# far as shutting down the instance but not as far as starting it
# again, then we've just become a hard reboot. That means the
# task state for the instance needs to change so that we're in one
# of the expected task states for a hard reboot.
if (instance.task_state in task_states.soft_reboot_states and
reboot_type == 'HARD'):
instance.task_state = task_states.REBOOT_PENDING_HARD
instance.save()
self.reboot_instance(context, instance, block_device_info=None,
reboot_type=reboot_type)
return
elif (current_power_state == power_state.RUNNING and
instance.task_state in [task_states.REBOOT_STARTED,
task_states.REBOOT_STARTED_HARD,
task_states.PAUSING,
task_states.UNPAUSING]):
LOG.warning("Instance in transitional state "
"(%(task_state)s) at start-up and power state "
"is (%(power_state)s), clearing task state",
{'task_state': instance.task_state,
'power_state': current_power_state},
instance=instance)
instance.task_state = None
instance.vm_state = vm_states.ACTIVE
instance.save()
elif (current_power_state == power_state.PAUSED and
instance.task_state == task_states.UNPAUSING):
LOG.warning("Instance in transitional state "
"(%(task_state)s) at start-up and power state "
"is (%(power_state)s), clearing task state "
"and unpausing the instance",
{'task_state': instance.task_state,
'power_state': current_power_state},
instance=instance)
try:
self.unpause_instance(context, instance)
except NotImplementedError:
# Some virt driver didn't support pause and unpause
pass
except Exception:
LOG.exception('Failed to unpause instance', instance=instance)
return
if instance.task_state == task_states.POWERING_OFF:
try:
LOG.debug("Instance in transitional state %s at start-up "
"retrying stop request",
instance.task_state, instance=instance)
self.stop_instance(context, instance, True)
except Exception:
# we don't want that an exception blocks the init_host
LOG.exception('Failed to stop instance', instance=instance)
return
if instance.task_state == task_states.POWERING_ON:
try:
LOG.debug("Instance in transitional state %s at start-up "
"retrying start request",
instance.task_state, instance=instance)
self.start_instance(context, instance)
except Exception:
# we don't want that an exception blocks the init_host
LOG.exception('Failed to start instance', instance=instance)
return
net_info = instance.get_network_info()
try:
self.driver.plug_vifs(instance, net_info)
except NotImplementedError as e:
LOG.debug(e, instance=instance)
except exception.VirtualInterfacePlugException:
# NOTE(mriedem): If we get here, it could be because the vif_type
# in the cache is "binding_failed" or "unbound".
# The periodic task _heal_instance_info_cache checks for this
# condition. It should fix this by binding the ports again when
# it gets to this instance.
LOG.exception('Virtual interface plugging failed for instance. '
'The port binding:host_id may need to be manually '
'updated.', instance=instance)
self._set_instance_obj_error_state(instance)
return
if instance.task_state == task_states.RESIZE_MIGRATING:
# We crashed during resize/migration, so roll back for safety
try:
# NOTE(mriedem): check old_vm_state for STOPPED here, if it's
# not in system_metadata we default to True for backwards
# compatibility
power_on = (instance.system_metadata.get('old_vm_state') !=
vm_states.STOPPED)
block_dev_info = self._get_instance_block_device_info(context,
instance)
migration = objects.Migration.get_by_id_and_instance(
context, instance.migration_context.migration_id,
instance.uuid)
self.driver.finish_revert_migration(context, instance,
net_info, migration, block_dev_info, power_on)
except Exception:
LOG.exception('Failed to revert crashed migration',
instance=instance)
finally:
LOG.info('Instance found in migrating state during '
'startup. Resetting task_state',
instance=instance)
instance.task_state = None
instance.save()
if instance.task_state == task_states.MIGRATING:
# Live migration did not complete, but instance is on this
# host. Abort ongoing migration if still running and reset state.
self._reset_live_migration(context, instance)
db_state = instance.power_state
drv_state = self._get_power_state(instance)
expect_running = (db_state == power_state.RUNNING and
drv_state != db_state)
LOG.debug('Current state is %(drv_state)s, state in DB is '
'%(db_state)s.',
{'drv_state': drv_state, 'db_state': db_state},
instance=instance)
if expect_running and CONF.resume_guests_state_on_host_boot:
self._resume_guests_state(context, instance, net_info)
def _resume_guests_state(self, context, instance, net_info):
LOG.info('Rebooting instance after nova-compute restart.',
instance=instance)
block_device_info = \
self._get_instance_block_device_info(context, instance)
try:
self.driver.resume_state_on_host_boot(
context, instance, net_info, block_device_info)
except NotImplementedError:
LOG.warning('Hypervisor driver does not support '
'resume guests', instance=instance)
except Exception:
# NOTE(vish): The instance failed to resume, so we set the
# instance to error and attempt to continue.
LOG.warning('Failed to resume instance',
instance=instance)
self._set_instance_obj_error_state(instance)
def _retry_reboot(self, instance, current_power_state):
current_task_state = instance.task_state
retry_reboot = False
reboot_type = compute_utils.get_reboot_type(current_task_state,
current_power_state)
pending_soft = (
current_task_state == task_states.REBOOT_PENDING and
instance.vm_state in vm_states.ALLOW_SOFT_REBOOT)
pending_hard = (
current_task_state == task_states.REBOOT_PENDING_HARD and
instance.vm_state in vm_states.ALLOW_HARD_REBOOT)
started_not_running = (current_task_state in
[task_states.REBOOT_STARTED,
task_states.REBOOT_STARTED_HARD] and
current_power_state != power_state.RUNNING)
if pending_soft or pending_hard or started_not_running:
retry_reboot = True
return retry_reboot, reboot_type
def handle_lifecycle_event(self, event):
LOG.info("VM %(state)s (Lifecycle Event)",
{'state': event.get_name()},
instance_uuid=event.get_instance_uuid())
context = nova.context.get_admin_context(read_deleted='yes')
vm_power_state = None
event_transition = event.get_transition()
if event_transition == virtevent.EVENT_LIFECYCLE_STOPPED:
vm_power_state = power_state.SHUTDOWN
elif event_transition == virtevent.EVENT_LIFECYCLE_STARTED:
vm_power_state = power_state.RUNNING
elif event_transition in (
virtevent.EVENT_LIFECYCLE_PAUSED,
virtevent.EVENT_LIFECYCLE_POSTCOPY_STARTED,
virtevent.EVENT_LIFECYCLE_MIGRATION_COMPLETED):
vm_power_state = power_state.PAUSED
elif event_transition == virtevent.EVENT_LIFECYCLE_RESUMED:
vm_power_state = power_state.RUNNING
elif event_transition == virtevent.EVENT_LIFECYCLE_SUSPENDED:
vm_power_state = power_state.SUSPENDED
else:
LOG.warning("Unexpected lifecycle event: %d", event_transition)
migrate_finish_statuses = {
# This happens on the source node and indicates live migration
# entered post-copy mode.
virtevent.EVENT_LIFECYCLE_POSTCOPY_STARTED: 'running (post-copy)',
# Suspended for offline migration.
virtevent.EVENT_LIFECYCLE_MIGRATION_COMPLETED: 'running'
}
expected_attrs = []
if event_transition in migrate_finish_statuses:
# Join on info_cache since that's needed in migrate_instance_start.
expected_attrs.append('info_cache')
instance = objects.Instance.get_by_uuid(context,
event.get_instance_uuid(),
expected_attrs=expected_attrs)
# Note(lpetrut): The event may be delayed, thus not reflecting
# the current instance power state. In that case, ignore the event.
current_power_state = self._get_power_state(instance)
if current_power_state == vm_power_state:
LOG.debug('Synchronizing instance power state after lifecycle '
'event "%(event)s"; current vm_state: %(vm_state)s, '
'current task_state: %(task_state)s, current DB '
'power_state: %(db_power_state)s, VM power_state: '
'%(vm_power_state)s',
{'event': event.get_name(),
'vm_state': instance.vm_state,
'task_state': instance.task_state,
'db_power_state': instance.power_state,
'vm_power_state': vm_power_state},
instance_uuid=instance.uuid)
self._sync_instance_power_state(context,
instance,
vm_power_state)
# The following checks are for live migration. We want to activate
# the port binding for the destination host before the live migration
# is resumed on the destination host in order to reduce network
# downtime. Otherwise the ports are bound to the destination host
# in post_live_migration_at_destination.
# TODO(danms): Explore options for using a different live migration
# specific callback for this instead of piggy-backing on the
# handle_lifecycle_event callback.
if (instance.task_state == task_states.MIGRATING and
event_transition in migrate_finish_statuses):
status = migrate_finish_statuses[event_transition]
try:
migration = objects.Migration.get_by_instance_and_status(
context, instance.uuid, status)
LOG.debug('Binding ports to destination host: %s',
migration.dest_compute, instance=instance)
# For neutron, migrate_instance_start will activate the
# destination host port bindings, if there are any created by
# conductor before live migration started.
self.network_api.migrate_instance_start(
context, instance, migration)
except exception.MigrationNotFoundByStatus:
LOG.warning("Unable to find migration record with status "
"'%s' for instance. Port binding will happen in "
"post live migration.", status, instance=instance)
def handle_events(self, event):
if isinstance(event, virtevent.LifecycleEvent):
try:
self.handle_lifecycle_event(event)
except exception.InstanceNotFound:
LOG.debug("Event %s arrived for non-existent instance. The "
"instance was probably deleted.", event)
else:
LOG.debug("Ignoring event %s", event)
def init_virt_events(self):
if CONF.workarounds.handle_virt_lifecycle_events:
self.driver.register_event_listener(self.handle_events)
else:
# NOTE(mriedem): If the _sync_power_states periodic task is
# disabled we should emit a warning in the logs.
if CONF.sync_power_state_interval < 0:
LOG.warning('Instance lifecycle events from the compute '
'driver have been disabled. Note that lifecycle '
'changes to an instance outside of the compute '
'service will not be synchronized '
'automatically since the _sync_power_states '
'periodic task is also disabled.')
else:
LOG.info('Instance lifecycle events from the compute '
'driver have been disabled. Note that lifecycle '
'changes to an instance outside of the compute '
'service will only be synchronized by the '
'_sync_power_states periodic task.')
def _get_nodes(self, context):
"""Queried the ComputeNode objects from the DB that are reported by the
hypervisor.
:param context: the request context
:return: a dict of ComputeNode objects keyed by the UUID of the given
node.
"""
nodes_by_uuid = {}
try:
node_names = self.driver.get_available_nodes()
except exception.VirtDriverNotReady:
LOG.warning(
"Virt driver is not ready. If this is the first time this "
"service is starting on this host, then you can ignore this "
"warning.")
return {}
for node_name in node_names:
try:
node = objects.ComputeNode.get_by_host_and_nodename(
context, self.host, node_name)
nodes_by_uuid[node.uuid] = node
except exception.ComputeHostNotFound:
LOG.warning(
"Compute node %s not found in the database. If this is "
"the first time this service is starting on this host, "
"then you can ignore this warning.", node_name)
return nodes_by_uuid
def init_host(self):
"""Initialization for a standalone compute service."""
if CONF.pci.passthrough_whitelist:
# Simply loading the PCI passthrough whitelist will do a bunch of
# validation that would otherwise wait until the PciDevTracker is
# constructed when updating available resources for the compute
# node(s) in the resource tracker, effectively killing that task.
# So load up the whitelist when starting the compute service to
# flush any invalid configuration early so we can kill the service
# if the configuration is wrong.
whitelist.Whitelist(CONF.pci.passthrough_whitelist)
nova.conf.neutron.register_dynamic_opts(CONF)
# Even if only libvirt uses them, make it available for all drivers
nova.conf.devices.register_dynamic_opts(CONF)
# Override the number of concurrent disk operations allowed if the
# user has specified a limit.
if CONF.compute.max_concurrent_disk_ops != 0:
compute_utils.disk_ops_semaphore = \
eventlet.semaphore.BoundedSemaphore(
CONF.compute.max_concurrent_disk_ops)
if CONF.compute.max_disk_devices_to_attach == 0:
msg = _('[compute]max_disk_devices_to_attach has been set to 0, '
'which will prevent instances from being able to boot. '
'Set -1 for unlimited or set >= 1 to limit the maximum '
'number of disk devices.')
raise exception.InvalidConfiguration(msg)
self.driver.init_host(host=self.host)
context = nova.context.get_admin_context()
instances = objects.InstanceList.get_by_host(
context, self.host,
expected_attrs=['info_cache', 'metadata', 'numa_topology'])
self.init_virt_events()
self._validate_pinning_configuration(instances)
self._validate_vtpm_configuration(instances)
# NOTE(gibi): At this point the compute_nodes of the resource tracker
# has not been populated yet so we cannot rely on the resource tracker
# here.
# NOTE(gibi): If ironic and vcenter virt driver slow start time
# becomes problematic here then we should consider adding a config
# option or a driver flag to tell us if we should thread
# _destroy_evacuated_instances and
# _error_out_instances_whose_build_was_interrupted out in the
# background on startup
nodes_by_uuid = self._get_nodes(context)
try:
# checking that instance was not already evacuated to other host
evacuated_instances = self._destroy_evacuated_instances(
context, nodes_by_uuid)
# Initialise instances on the host that are not evacuating
for instance in instances:
if instance.uuid not in evacuated_instances:
self._init_instance(context, instance)
# NOTE(gibi): collect all the instance uuids that is in some way
# was already handled above. Either by init_instance or by
# _destroy_evacuated_instances. This way we can limit the scope of
# the _error_out_instances_whose_build_was_interrupted call to look
# only for instances that have allocations on this node and not
# handled by the above calls.
already_handled = {instance.uuid for instance in instances}.union(
evacuated_instances)
self._error_out_instances_whose_build_was_interrupted(
context, already_handled, nodes_by_uuid.keys())
finally:
if instances:
# We only send the instance info to the scheduler on startup
# if there is anything to send, otherwise this host might
# not be mapped yet in a cell and the scheduler may have
# issues dealing with the information. Later changes to
# instances on this host will update the scheduler, or the
# _sync_scheduler_instance_info periodic task will.
self._update_scheduler_instance_info(context, instances)
def _error_out_instances_whose_build_was_interrupted(
self, context, already_handled_instances, node_uuids):
"""If there are instances in BUILDING state that are not
assigned to this host but have allocations in placement towards
this compute that means the nova-compute service was
restarted while those instances waited for the resource claim
to finish and the _set_instance_host_and_node() to update the
instance.host field. We need to push them to ERROR state here to
prevent keeping them in BUILDING state forever.
:param context: The request context
:param already_handled_instances: The set of instance UUIDs that the
host initialization process already handled in some way.
:param node_uuids: The list of compute node uuids handled by this
service
"""
# Strategy:
# 1) Get the allocations from placement for our compute node(s)
# 2) Remove the already handled instances from the consumer list;
# they are either already initialized or need to be skipped.
# 3) Check which remaining consumer is an instance in BUILDING state
# and push it to ERROR state.
LOG.info(
"Looking for unclaimed instances stuck in BUILDING status for "
"nodes managed by this host")
for cn_uuid in node_uuids:
try:
f = self.reportclient.get_allocations_for_resource_provider
allocations = f(context, cn_uuid).allocations
except (exception.ResourceProviderAllocationRetrievalFailed,
keystone_exception.ClientException) as e:
LOG.error(
"Could not retrieve compute node resource provider %s and "
"therefore unable to error out any instances stuck in "
"BUILDING state. Error: %s", cn_uuid, str(e))
continue
not_handled_consumers = (set(allocations) -
already_handled_instances)
if not not_handled_consumers:
continue
filters = {
'vm_state': vm_states.BUILDING,
'uuid': not_handled_consumers
}
instances = objects.InstanceList.get_by_filters(
context, filters, expected_attrs=[])
for instance in instances:
LOG.debug(
"Instance spawn was interrupted before instance_claim, "
"setting instance to ERROR state", instance=instance)
self._set_instance_obj_error_state(
instance, clean_task_state=True)
def cleanup_host(self):
self.driver.register_event_listener(None)
self.instance_events.cancel_all_events()
self.driver.cleanup_host(host=self.host)
self._cleanup_live_migrations_in_pool()
def _cleanup_live_migrations_in_pool(self):
# Shutdown the pool so we don't get new requests.
self._live_migration_executor.shutdown(wait=False)
# For any queued migrations, cancel the migration and update
# its status.
for migration, future in self._waiting_live_migrations.values():
# If we got here before the Future was submitted then we need
# to move on since there isn't anything we can do.
if future is None:
continue
if future.cancel():
self._set_migration_status(migration, 'cancelled')
LOG.info('Successfully cancelled queued live migration.',
instance_uuid=migration.instance_uuid)
else:
LOG.warning('Unable to cancel live migration.',
instance_uuid=migration.instance_uuid)
self._waiting_live_migrations.clear()
def pre_start_hook(self):
"""After the service is initialized, but before we fully bring
the service up by listening on RPC queues, make sure to update
our available resources (and indirectly our available nodes).
"""
self.update_available_resource(nova.context.get_admin_context(),
startup=True)
def _get_power_state(self, instance):
"""Retrieve the power state for the given instance."""
LOG.debug('Checking state', instance=instance)
try:
return self.driver.get_info(instance, use_cache=False).state
except exception.InstanceNotFound:
return power_state.NOSTATE
def _await_block_device_map_created(self, context, vol_id):
# TODO(yamahata): creating volume simultaneously
# reduces creation time?
# TODO(yamahata): eliminate dumb polling
start = time.time()
retries = CONF.block_device_allocate_retries
# (1) if the configured value is 0, one attempt should be made
# (2) if the configured value is > 0, then the total number attempts
# is (retries + 1)
attempts = 1
if retries >= 1:
attempts = retries + 1
for attempt in range(1, attempts + 1):
volume = self.volume_api.get(context, vol_id)
volume_status = volume['status']
if volume_status not in ['creating', 'downloading']:
if volume_status == 'available':
return attempt
LOG.warning("Volume id: %(vol_id)s finished being "
"created but its status is %(vol_status)s.",
{'vol_id': vol_id,
'vol_status': volume_status})
break
greenthread.sleep(CONF.block_device_allocate_retries_interval)
raise exception.VolumeNotCreated(volume_id=vol_id,
seconds=int(time.time() - start),
attempts=attempt,
volume_status=volume_status)
def _decode_files(self, injected_files):
"""Base64 decode the list of files to inject."""
if not injected_files:
return []
def _decode(f):
path, contents = f
# Py3 raises binascii.Error instead of TypeError as in Py27
try:
decoded = base64.b64decode(contents)
return path, decoded
except (TypeError, binascii.Error):
raise exception.Base64Exception(path=path)
return [_decode(f) for f in injected_files]
def _validate_instance_group_policy(self, context, instance,
scheduler_hints=None):
if CONF.workarounds.disable_group_policy_check_upcall:
return
# NOTE(russellb) Instance group policy is enforced by the scheduler.
# However, there is a race condition with the enforcement of
# the policy. Since more than one instance may be scheduled at the
# same time, it's possible that more than one instance with an
# anti-affinity policy may end up here. It's also possible that
# multiple instances with an affinity policy could end up on different
# hosts. This is a validation step to make sure that starting the
# instance here doesn't violate the policy.
if scheduler_hints is not None:
# only go through here if scheduler_hints is provided, even if it
# is empty.
group_hint = scheduler_hints.get('group')
if not group_hint:
return
else:
# The RequestSpec stores scheduler_hints as key=list pairs so
# we need to check the type on the value and pull the single
# entry out. The API request schema validates that
# the 'group' hint is a single value.
if isinstance(group_hint, list):
group_hint = group_hint[0]
group = objects.InstanceGroup.get_by_hint(context, group_hint)
else:
# TODO(ganso): a call to DB can be saved by adding request_spec
# to rpcapi payload of live_migration, pre_live_migration and
# check_can_live_migrate_destination
try:
group = objects.InstanceGroup.get_by_instance_uuid(
context, instance.uuid)
except exception.InstanceGroupNotFound:
return
@utils.synchronized(group['uuid'])
def _do_validation(context, instance, group):
if group.policy and 'anti-affinity' == group.policy:
# instances on host
instances_uuids = objects.InstanceList.get_uuids_by_host(
context, self.host)
ins_on_host = set(instances_uuids)
# instance param is just for logging, the nodename obtained is
# not actually related to the instance at all
nodename = self._get_nodename(instance)
# instances being migrated to host
migrations = (
objects.MigrationList.get_in_progress_by_host_and_node(
context, self.host, nodename))
migration_vm_uuids = set([mig['instance_uuid']
for mig in migrations])
total_instances = migration_vm_uuids | ins_on_host
# refresh group to get updated members within locked block
group = objects.InstanceGroup.get_by_uuid(context,
group['uuid'])
members = set(group.members)
# Determine the set of instance group members on this host
# which are not the instance in question. This is used to
# determine how many other members from the same anti-affinity
# group can be on this host.
members_on_host = (total_instances & members -
set([instance.uuid]))
rules = group.rules
if rules and 'max_server_per_host' in rules:
max_server = rules['max_server_per_host']
else:
max_server = 1
if len(members_on_host) >= max_server:
msg = _("Anti-affinity instance group policy "
"was violated.")
raise exception.RescheduledException(
instance_uuid=instance.uuid,
reason=msg)
# NOTE(ganso): The check for affinity below does not work and it
# can easily be violated because the lock happens in different
# compute hosts.
# The only fix seems to be a DB lock to perform the check whenever
# setting the host field to an instance.
elif group.policy and 'affinity' == group.policy:
group_hosts = group.get_hosts(exclude=[instance.uuid])
if group_hosts and self.host not in group_hosts:
msg = _("Affinity instance group policy was violated.")
raise exception.RescheduledException(
instance_uuid=instance.uuid,
reason=msg)
_do_validation(context, instance, group)
def _log_original_error(self, exc_info, instance_uuid):
LOG.error('Error: %s', exc_info[1], instance_uuid=instance_uuid,
exc_info=exc_info)
@periodic_task.periodic_task
def _check_instance_build_time(self, context):
"""Ensure that instances are not stuck in build."""
timeout = CONF.instance_build_timeout
if timeout == 0:
return
filters = {'vm_state': vm_states.BUILDING,
'host': self.host}
building_insts = objects.InstanceList.get_by_filters(context,
filters, expected_attrs=[], use_slave=True)
for instance in building_insts:
if timeutils.is_older_than(instance.created_at, timeout):
self._set_instance_obj_error_state(instance)
LOG.warning("Instance build timed out. Set to error "
"state.", instance=instance)
def _check_instance_exists(self, instance):
"""Ensure an instance with the same name is not already present."""
if self.driver.instance_exists(instance):
raise exception.InstanceExists(name=instance.name)
def _allocate_network_async(self, context, instance, requested_networks,
security_groups, resource_provider_mapping):
"""Method used to allocate networks in the background.
Broken out for testing.
"""
# First check to see if we're specifically not supposed to allocate
# networks because if so, we can exit early.
if requested_networks and requested_networks.no_allocate:
LOG.debug("Not allocating networking since 'none' was specified.",
instance=instance)
return network_model.NetworkInfo([])
LOG.debug("Allocating IP information in the background.",
instance=instance)
retries = CONF.network_allocate_retries
attempts = retries + 1
retry_time = 1
bind_host_id = self.driver.network_binding_host_id(context, instance)
for attempt in range(1, attempts + 1):
try:
nwinfo = self.network_api.allocate_for_instance(
context, instance,
requested_networks=requested_networks,
security_groups=security_groups,
bind_host_id=bind_host_id,
resource_provider_mapping=resource_provider_mapping)
LOG.debug('Instance network_info: |%s|', nwinfo,
instance=instance)
instance.system_metadata['network_allocated'] = 'True'
# NOTE(JoshNang) do not save the instance here, as it can cause
# races. The caller shares a reference to instance and waits
# for this async greenthread to finish before calling
# instance.save().
return nwinfo
except Exception as e:
log_info = {'attempt': attempt,
'attempts': attempts}
if attempt == attempts:
LOG.exception('Instance failed network setup '
'after %(attempts)d attempt(s)',
log_info)
raise e
LOG.warning('Instance failed network setup '
'(attempt %(attempt)d of %(attempts)d)',
log_info, instance=instance)
time.sleep(retry_time)
retry_time *= 2
if retry_time > 30:
retry_time = 30
# Not reached.
def _build_networks_for_instance(self, context, instance,
requested_networks, security_groups, resource_provider_mapping):
# If we're here from a reschedule the network may already be allocated.
if strutils.bool_from_string(
instance.system_metadata.get('network_allocated', 'False')):
# NOTE(alex_xu): The network_allocated is True means the network
# resource already allocated at previous scheduling, and the
# network setup is cleanup at previous. After rescheduling, the
# network resource need setup on the new host.
self.network_api.setup_instance_network_on_host(
context, instance, instance.host)
return self.network_api.get_instance_nw_info(context, instance)
network_info = self._allocate_network(context, instance,
requested_networks, security_groups,
resource_provider_mapping)
return network_info
def _allocate_network(self, context, instance, requested_networks,
security_groups, resource_provider_mapping):
"""Start network allocation asynchronously. Return an instance
of NetworkInfoAsyncWrapper that can be used to retrieve the
allocated networks when the operation has finished.
"""
# NOTE(comstud): Since we're allocating networks asynchronously,
# this task state has little meaning, as we won't be in this
# state for very long.
instance.vm_state = vm_states.BUILDING
instance.task_state = task_states.NETWORKING
instance.save(expected_task_state=[None])
return network_model.NetworkInfoAsyncWrapper(
self._allocate_network_async, context, instance,
requested_networks, security_groups, resource_provider_mapping)
def _default_root_device_name(self, instance, image_meta, root_bdm):
"""Gets a default root device name from the driver.
:param nova.objects.Instance instance:
The instance for which to get the root device name.
:param nova.objects.ImageMeta image_meta:
The metadata of the image of the instance.
:param nova.objects.BlockDeviceMapping root_bdm:
The description of the root device.
:returns: str -- The default root device name.
:raises: InternalError, TooManyDiskDevices
"""
try:
return self.driver.default_root_device_name(instance,
image_meta,
root_bdm)
except NotImplementedError:
return compute_utils.get_next_device_name(instance, [])
def _default_device_names_for_instance(self, instance,
root_device_name,
*block_device_lists):
"""Default the missing device names in the BDM from the driver.
:param nova.objects.Instance instance:
The instance for which to get default device names.
:param str root_device_name: The root device name.
:param list block_device_lists: List of block device mappings.
:returns: None
:raises: InternalError, TooManyDiskDevices
"""
try:
self.driver.default_device_names_for_instance(instance,
root_device_name,
*block_device_lists)
except NotImplementedError:
compute_utils.default_device_names_for_instance(
instance, root_device_name, *block_device_lists)
def _get_device_name_for_instance(self, instance, bdms, block_device_obj):
"""Get the next device name from the driver, based on the BDM.
:param nova.objects.Instance instance:
The instance whose volume is requesting a device name.
:param nova.objects.BlockDeviceMappingList bdms:
The block device mappings for the instance.
:param nova.objects.BlockDeviceMapping block_device_obj:
A block device mapping containing info about the requested block
device.
:returns: The next device name.
:raises: InternalError, TooManyDiskDevices
"""
# NOTE(ndipanov): Copy obj to avoid changing the original
block_device_obj = block_device_obj.obj_clone()
try:
return self.driver.get_device_name_for_instance(
instance, bdms, block_device_obj)
except NotImplementedError:
return compute_utils.get_device_name_for_instance(
instance, bdms, block_device_obj.get("device_name"))
def _default_block_device_names(self, instance, image_meta, block_devices):
"""Verify that all the devices have the device_name set. If not,
provide a default name.
It also ensures that there is a root_device_name and is set to the
first block device in the boot sequence (boot_index=0).
"""
root_bdm = block_device.get_root_bdm(block_devices)
if not root_bdm:
return
# Get the root_device_name from the root BDM or the instance
root_device_name = None
update_root_bdm = False
if root_bdm.device_name:
root_device_name = root_bdm.device_name
instance.root_device_name = root_device_name
elif instance.root_device_name:
root_device_name = instance.root_device_name
root_bdm.device_name = root_device_name
update_root_bdm = True
else:
root_device_name = self._default_root_device_name(instance,
image_meta,
root_bdm)
instance.root_device_name = root_device_name
root_bdm.device_name = root_device_name
update_root_bdm = True
if update_root_bdm:
root_bdm.save()
ephemerals = []
swap = []
block_device_mapping = []
for device in block_devices:
if block_device.new_format_is_ephemeral(device):
ephemerals.append(device)
if block_device.new_format_is_swap(device):
swap.append(device)
if driver_block_device.is_block_device_mapping(device):
block_device_mapping.append(device)
self._default_device_names_for_instance(instance,
root_device_name,
ephemerals,
swap,
block_device_mapping)
def _block_device_info_to_legacy(self, block_device_info):
"""Convert BDI to the old format for drivers that need it."""
if self.use_legacy_block_device_info:
ephemerals = driver_block_device.legacy_block_devices(
driver.block_device_info_get_ephemerals(block_device_info))
mapping = driver_block_device.legacy_block_devices(
driver.block_device_info_get_mapping(block_device_info))
swap = block_device_info['swap']
if swap:
swap = swap.legacy()
block_device_info.update({
'ephemerals': ephemerals,
'swap': swap,
'block_device_mapping': mapping})
def _add_missing_dev_names(self, bdms, instance):
for bdm in bdms:
if bdm.device_name is not None:
continue
device_name = self._get_device_name_for_instance(instance,
bdms, bdm)
values = {'device_name': device_name}
bdm.update(values)
bdm.save()
def _prep_block_device(self, context, instance, bdms):
"""Set up the block device for an instance with error logging."""
try:
self._add_missing_dev_names(bdms, instance)
block_device_info = driver.get_block_device_info(instance, bdms)
mapping = driver.block_device_info_get_mapping(block_device_info)
driver_block_device.attach_block_devices(
mapping, context, instance, self.volume_api, self.driver,
wait_func=self._await_block_device_map_created)
self._block_device_info_to_legacy(block_device_info)
return block_device_info
except exception.OverQuota as e:
LOG.warning('Failed to create block device for instance due'
' to exceeding volume related resource quota.'
' Error: %s', e.message, instance=instance)
raise
except Exception as ex:
LOG.exception('Instance failed block device setup',
instance=instance)
# InvalidBDM will eventually result in a BuildAbortException when
# booting from volume, and will be recorded as an instance fault.
# Maintain the original exception message which most likely has
# useful details which the standard InvalidBDM error message lacks.
raise exception.InvalidBDM(str(ex))
def _update_instance_after_spawn(self, instance,
vm_state=vm_states.ACTIVE):
instance.power_state = self._get_power_state(instance)
instance.vm_state = vm_state
instance.task_state = None
# NOTE(sean-k-mooney): configdrive.update_instance checks
# instance.launched_at to determine if it is the first or
# subsequent spawn of an instance. We need to call update_instance
# first before setting instance.launched_at or instance.config_drive
# will never be set to true based on the value of force_config_drive.
# As a result the config drive will be lost on a hard reboot of the
# instance even when force_config_drive=true. see bug #1835822.
configdrive.update_instance(instance)
instance.launched_at = timeutils.utcnow()
def _update_scheduler_instance_info(self, context, instance):
"""Sends an InstanceList with created or updated Instance objects to
the Scheduler client.
In the case of init_host, the value passed will already be an
InstanceList. Other calls will send individual Instance objects that
have been created or resized. In this case, we create an InstanceList
object containing that Instance.
"""
if not self.send_instance_updates:
return
if isinstance(instance, obj_instance.Instance):
instance = objects.InstanceList(objects=[instance])
context = context.elevated()
self.query_client.update_instance_info(context, self.host,
instance)
def _delete_scheduler_instance_info(self, context, instance_uuid):
"""Sends the uuid of the deleted Instance to the Scheduler client."""
if not self.send_instance_updates:
return
context = context.elevated()
self.query_client.delete_instance_info(context, self.host,
instance_uuid)
@periodic_task.periodic_task(spacing=CONF.scheduler_instance_sync_interval)
def _sync_scheduler_instance_info(self, context):
if not self.send_instance_updates:
return
context = context.elevated()
instances = objects.InstanceList.get_by_host(context, self.host,
expected_attrs=[],
use_slave=True)
uuids = [instance.uuid for instance in instances]
self.query_client.sync_instance_info(context, self.host, uuids)
def _notify_about_instance_usage(self, context, instance, event_suffix,
network_info=None, extra_usage_info=None,
fault=None):
compute_utils.notify_about_instance_usage(
self.notifier, context, instance, event_suffix,
network_info=network_info,
extra_usage_info=extra_usage_info, fault=fault)
def _deallocate_network(self, context, instance,
requested_networks=None):
# If we were told not to allocate networks let's save ourselves
# the trouble of calling the network API.
if requested_networks and requested_networks.no_allocate:
LOG.debug("Skipping network deallocation for instance since "
"networking was not requested.", instance=instance)
return
LOG.debug('Deallocating network for instance', instance=instance)
with timeutils.StopWatch() as timer:
self.network_api.deallocate_for_instance(
context, instance, requested_networks=requested_networks)
# nova-network does an rpc call so we're OK tracking time spent here
LOG.info('Took %0.2f seconds to deallocate network for instance.',
timer.elapsed(), instance=instance)
def _get_instance_block_device_info(self, context, instance,
refresh_conn_info=False,
bdms=None):
"""Transform block devices to the driver block_device format."""
if bdms is None:
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
context, instance.uuid)
block_device_info = driver.get_block_device_info(instance, bdms)
if not refresh_conn_info:
# if the block_device_mapping has no value in connection_info
# (returned as None), don't include in the mapping
block_device_info['block_device_mapping'] = [
bdm for bdm in driver.block_device_info_get_mapping(
block_device_info)
if bdm.get('connection_info')]
else:
driver_block_device.refresh_conn_infos(
driver.block_device_info_get_mapping(block_device_info),
context, instance, self.volume_api, self.driver)
self._block_device_info_to_legacy(block_device_info)
return block_device_info
def _build_failed(self, node):
if CONF.compute.consecutive_build_service_disable_threshold:
# NOTE(danms): Update our counter, but wait for the next
# update_available_resource() periodic to flush it to the DB
self.rt.build_failed(node)
def _build_succeeded(self, node):
self.rt.build_succeeded(node)
@wrap_exception()
@reverts_task_state
@wrap_instance_fault
def build_and_run_instance(self, context, instance, image, request_spec,
filter_properties, accel_uuids, admin_password=None,
injected_files=None, requested_networks=None,
security_groups=None, block_device_mapping=None,
node=None, limits=None, host_list=None):
@utils.synchronized(instance.uuid)
def _locked_do_build_and_run_instance(*args, **kwargs):
# NOTE(danms): We grab the semaphore with the instance uuid
# locked because we could wait in line to build this instance
# for a while and we want to make sure that nothing else tries
# to do anything with this instance while we wait.
with self._build_semaphore:
try:
result = self._do_build_and_run_instance(*args, **kwargs)
except Exception:
# NOTE(mriedem): This should really only happen if
# _decode_files in _do_build_and_run_instance fails, and
# that's before a guest is spawned so it's OK to remove
# allocations for the instance for this node from Placement
# below as there is no guest consuming resources anyway.
# The _decode_files case could be handled more specifically
# but that's left for another day.
result = build_results.FAILED
raise
finally:
if result == build_results.FAILED:
# Remove the allocation records from Placement for the
# instance if the build failed. The instance.host is
# likely set to None in _do_build_and_run_instance
# which means if the user deletes the instance, it
# will be deleted in the API, not the compute service.
# Setting the instance.host to None in
# _do_build_and_run_instance means that the
# ResourceTracker will no longer consider this instance
# to be claiming resources against it, so we want to
# reflect that same thing in Placement. No need to
# call this for a reschedule, as the allocations will
# have already been removed in
# self._do_build_and_run_instance().
self.reportclient.delete_allocation_for_instance(
context, instance.uuid)
if result in (build_results.FAILED,
build_results.RESCHEDULED):
self._build_failed(node)
else:
self._build_succeeded(node)
# NOTE(danms): We spawn here to return the RPC worker thread back to
# the pool. Since what follows could take a really long time, we don't
# want to tie up RPC workers.
utils.spawn_n(_locked_do_build_and_run_instance,
context, instance, image, request_spec,
filter_properties, admin_password, injected_files,
requested_networks, security_groups,
block_device_mapping, node, limits, host_list,
accel_uuids)
def _check_device_tagging(self, requested_networks, block_device_mapping):
tagging_requested = False
if requested_networks:
for net in requested_networks:
if 'tag' in net and net.tag is not None:
tagging_requested = True
break
if block_device_mapping and not tagging_requested:
for bdm in block_device_mapping:
if 'tag' in bdm and bdm.tag is not None:
tagging_requested = True
break
if (tagging_requested and
not self.driver.capabilities.get('supports_device_tagging',
False)):
raise exception.BuildAbortException('Attempt to boot guest with '
'tagged devices on host that '
'does not support tagging.')
def _check_trusted_certs(self, instance):
if (instance.trusted_certs and
not self.driver.capabilities.get('supports_trusted_certs',
False)):
raise exception.BuildAbortException(
'Trusted image certificates provided on host that does not '
'support certificate validation.')
@wrap_exception()
@reverts_task_state
@wrap_instance_event(prefix='compute')
@wrap_instance_fault
def _do_build_and_run_instance(self, context, instance, image,
request_spec, filter_properties, admin_password, injected_files,
requested_networks, security_groups, block_device_mapping,
node=None, limits=None, host_list=None, accel_uuids=None):
try:
LOG.debug('Starting instance...', instance=instance)
instance.vm_state = vm_states.BUILDING
instance.task_state = None
instance.save(expected_task_state=
(task_states.SCHEDULING, None))
except exception.InstanceNotFound:
msg = 'Instance disappeared before build.'
LOG.debug(msg, instance=instance)
return build_results.FAILED
except exception.UnexpectedTaskStateError as e:
LOG.debug(e.format_message(), instance=instance)
return build_results.FAILED
# b64 decode the files to inject:
decoded_files = self._decode_files(injected_files)
if limits is None:
limits = {}
if node is None:
node = self._get_nodename(instance, refresh=True)
try:
with timeutils.StopWatch() as timer:
self._build_and_run_instance(context, instance, image,
decoded_files, admin_password, requested_networks,
security_groups, block_device_mapping, node, limits,
filter_properties, request_spec, accel_uuids)
LOG.info('Took %0.2f seconds to build instance.',
timer.elapsed(), instance=instance)
return build_results.ACTIVE
except exception.RescheduledException as e:
retry = filter_properties.get('retry')
if not retry:
# no retry information, do not reschedule.
LOG.debug("Retry info not present, will not reschedule",
instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
compute_utils.add_instance_fault_from_exc(context,
instance, e, sys.exc_info(),
fault_message=e.kwargs['reason'])
self._nil_out_instance_obj_host_and_node(instance)
self._set_instance_obj_error_state(instance,
clean_task_state=True)
return build_results.FAILED
LOG.debug(e.format_message(), instance=instance)
# This will be used for logging the exception
retry['exc'] = traceback.format_exception(*sys.exc_info())
# This will be used for setting the instance fault message
retry['exc_reason'] = e.kwargs['reason']
self._cleanup_allocated_networks(context, instance,
requested_networks)
self._nil_out_instance_obj_host_and_node(instance)
instance.task_state = task_states.SCHEDULING
instance.save()
# The instance will have already claimed resources from this host
# before this build was attempted. Now that it has failed, we need
# to unclaim those resources before casting to the conductor, so
# that if there are alternate hosts available for a retry, it can
# claim resources on that new host for the instance.
self.reportclient.delete_allocation_for_instance(context,
instance.uuid)
self.compute_task_api.build_instances(context, [instance],
image, filter_properties, admin_password,
injected_files, requested_networks, security_groups,
block_device_mapping, request_spec=request_spec,
host_lists=[host_list])
return build_results.RESCHEDULED
except (exception.InstanceNotFound,
exception.UnexpectedDeletingTaskStateError):
msg = 'Instance disappeared during build.'
LOG.debug(msg, instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
return build_results.FAILED
except Exception as e:
if isinstance(e, exception.BuildAbortException):
LOG.error(e.format_message(), instance=instance)
else:
# Should not reach here.
LOG.exception('Unexpected build failure, not rescheduling '
'build.', instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
self._cleanup_volumes(context, instance,
block_device_mapping, raise_exc=False)
compute_utils.add_instance_fault_from_exc(context, instance,
e, sys.exc_info())
self._nil_out_instance_obj_host_and_node(instance)
self._set_instance_obj_error_state(instance, clean_task_state=True)
return build_results.FAILED
@staticmethod
def _get_scheduler_hints(filter_properties, request_spec=None):
"""Helper method to get scheduler hints.
This method prefers to get the hints out of the request spec, but that
might not be provided. Conductor will pass request_spec down to the
first compute chosen for a build but older computes will not pass
the request_spec to conductor's build_instances method for a
a reschedule, so if we're on a host via a retry, request_spec may not
be provided so we need to fallback to use the filter_properties
to get scheduler hints.
"""
hints = {}
if request_spec is not None and 'scheduler_hints' in request_spec:
hints = request_spec.scheduler_hints
if not hints:
hints = filter_properties.get('scheduler_hints') or {}
return hints
@staticmethod
def _get_request_group_mapping(request_spec):
"""Return request group resource - provider mapping. This is currently
used for Neutron ports that have resource request due to the port
having QoS minimum bandwidth policy rule attached.
:param request_spec: A RequestSpec object or None
:returns: A dict keyed by RequestGroup requester_id, currently Neutron
port_id, to resource provider UUID that provides resource for that
RequestGroup. Or None if the request_spec was None.
"""
# TODO(sbauza): Remove this conditional once we only support
# RPC API 6.0
if request_spec:
return request_spec.get_request_group_mapping()
else:
return None
def _build_and_run_instance(self, context, instance, image, injected_files,
admin_password, requested_networks, security_groups,
block_device_mapping, node, limits, filter_properties,
request_spec=None, accel_uuids=None):
image_name = image.get('name')
self._notify_about_instance_usage(context, instance, 'create.start',
extra_usage_info={'image_name': image_name})
compute_utils.notify_about_instance_create(
context, instance, self.host,
phase=fields.NotificationPhase.START,
bdms=block_device_mapping)
# NOTE(mikal): cache the keystone roles associated with the instance
# at boot time for later reference
instance.system_metadata.update(
{'boot_roles': ','.join(context.roles)})
self._check_device_tagging(requested_networks, block_device_mapping)
self._check_trusted_certs(instance)
provider_mapping = self._get_request_group_mapping(request_spec)
if provider_mapping:
try:
compute_utils\
.update_pci_request_spec_with_allocated_interface_name(
context, self.reportclient,
instance.pci_requests.requests, provider_mapping)
except (exception.AmbiguousResourceProviderForPCIRequest,
exception.UnexpectedResourceProviderNameForPCIRequest
) as e:
raise exception.BuildAbortException(
reason=str(e), instance_uuid=instance.uuid)
# TODO(Luyao) cut over to get_allocs_for_consumer
allocs = self.reportclient.get_allocations_for_consumer(
context, instance.uuid)
try:
scheduler_hints = self._get_scheduler_hints(filter_properties,
request_spec)
with self.rt.instance_claim(context, instance, node, allocs,
limits):
# NOTE(russellb) It's important that this validation be done
# *after* the resource tracker instance claim, as that is where
# the host is set on the instance.
self._validate_instance_group_policy(context, instance,
scheduler_hints)
image_meta = objects.ImageMeta.from_dict(image)
with self._build_resources(context, instance,
requested_networks, security_groups, image_meta,
block_device_mapping, provider_mapping,
accel_uuids) as resources:
instance.vm_state = vm_states.BUILDING
instance.task_state = task_states.SPAWNING
# NOTE(JoshNang) This also saves the changes to the
# instance from _allocate_network_async, as they aren't
# saved in that function to prevent races.
instance.save(expected_task_state=
task_states.BLOCK_DEVICE_MAPPING)
block_device_info = resources['block_device_info']
network_info = resources['network_info']
accel_info = resources['accel_info']
LOG.debug('Start spawning the instance on the hypervisor.',
instance=instance)
with timeutils.StopWatch() as timer:
self.driver.spawn(context, instance, image_meta,
injected_files, admin_password,
allocs, network_info=network_info,
block_device_info=block_device_info,
accel_info=accel_info)
LOG.info('Took %0.2f seconds to spawn the instance on '
'the hypervisor.', timer.elapsed(),
instance=instance)
except (exception.InstanceNotFound,
exception.UnexpectedDeletingTaskStateError) as e:
with excutils.save_and_reraise_exception():
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
compute_utils.notify_about_instance_create(
context, instance, self.host,
phase=fields.NotificationPhase.ERROR, exception=e,
bdms=block_device_mapping)
except exception.ComputeResourcesUnavailable as e:
LOG.debug(e.format_message(), instance=instance)
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
compute_utils.notify_about_instance_create(
context, instance, self.host,
phase=fields.NotificationPhase.ERROR, exception=e,
bdms=block_device_mapping)
raise exception.RescheduledException(
instance_uuid=instance.uuid, reason=e.format_message())
except exception.BuildAbortException as e:
with excutils.save_and_reraise_exception():
LOG.debug(e.format_message(), instance=instance)
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
compute_utils.notify_about_instance_create(
context, instance, self.host,
phase=fields.NotificationPhase.ERROR, exception=e,
bdms=block_device_mapping)
except exception.NoMoreFixedIps as e:
LOG.warning('No more fixed IP to be allocated',
instance=instance)
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
compute_utils.notify_about_instance_create(
context, instance, self.host,
phase=fields.NotificationPhase.ERROR, exception=e,
bdms=block_device_mapping)
msg = _('Failed to allocate the network(s) with error %s, '
'not rescheduling.') % e.format_message()
raise exception.BuildAbortException(instance_uuid=instance.uuid,
reason=msg)
except (exception.ExternalNetworkAttachForbidden,
exception.VirtualInterfaceCreateException,
exception.VirtualInterfaceMacAddressException,
exception.FixedIpInvalidOnHost,
exception.UnableToAutoAllocateNetwork,
exception.NetworksWithQoSPolicyNotSupported) as e:
LOG.exception('Failed to allocate network(s)',
instance=instance)
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
compute_utils.notify_about_instance_create(
context, instance, self.host<