You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
11091 lines
527 KiB
11091 lines
527 KiB
# Copyright 2010 United States Government as represented by the |
|
# Administrator of the National Aeronautics and Space Administration. |
|
# Copyright 2011 Justin Santa Barbara |
|
# All Rights Reserved. |
|
# |
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
|
# not use this file except in compliance with the License. You may obtain |
|
# a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
|
# License for the specific language governing permissions and limitations |
|
# under the License. |
|
|
|
"""Handles all processes relating to instances (guest vms). |
|
|
|
The :py:class:`ComputeManager` class is a :py:class:`nova.manager.Manager` that |
|
handles RPC calls relating to creating instances. It is responsible for |
|
building a disk image, launching it via the underlying virtualization driver, |
|
responding to calls to check its state, attaching persistent storage, and |
|
terminating it. |
|
|
|
""" |
|
|
|
import base64 |
|
import binascii |
|
import contextlib |
|
import copy |
|
import functools |
|
import inspect |
|
import sys |
|
import time |
|
import traceback |
|
import typing as ty |
|
|
|
from cinderclient import exceptions as cinder_exception |
|
from cursive import exception as cursive_exception |
|
import eventlet.event |
|
from eventlet import greenthread |
|
import eventlet.semaphore |
|
import eventlet.timeout |
|
import futurist |
|
from keystoneauth1 import exceptions as keystone_exception |
|
import os_traits |
|
from oslo_log import log as logging |
|
import oslo_messaging as messaging |
|
from oslo_serialization import jsonutils |
|
from oslo_service import loopingcall |
|
from oslo_service import periodic_task |
|
from oslo_utils import excutils |
|
from oslo_utils import strutils |
|
from oslo_utils import timeutils |
|
from oslo_utils import units |
|
|
|
from nova.accelerator import cyborg |
|
from nova import block_device |
|
from nova.compute import api as compute |
|
from nova.compute import build_results |
|
from nova.compute import claims |
|
from nova.compute import power_state |
|
from nova.compute import resource_tracker |
|
from nova.compute import rpcapi as compute_rpcapi |
|
from nova.compute import task_states |
|
from nova.compute import utils as compute_utils |
|
from nova.compute.utils import wrap_instance_event |
|
from nova.compute import vm_states |
|
from nova import conductor |
|
import nova.conf |
|
import nova.context |
|
from nova import exception |
|
from nova import exception_wrapper |
|
from nova.i18n import _ |
|
from nova.image import glance |
|
from nova import manager |
|
from nova.network import model as network_model |
|
from nova.network import neutron |
|
from nova import objects |
|
from nova.objects import base as obj_base |
|
from nova.objects import external_event as external_event_obj |
|
from nova.objects import fields |
|
from nova.objects import instance as obj_instance |
|
from nova.objects import migrate_data as migrate_data_obj |
|
from nova.pci import request as pci_req_module |
|
from nova.pci import whitelist |
|
from nova import safe_utils |
|
from nova.scheduler.client import query |
|
from nova.scheduler.client import report |
|
from nova.scheduler import utils as scheduler_utils |
|
from nova import utils |
|
from nova.virt import block_device as driver_block_device |
|
from nova.virt import configdrive |
|
from nova.virt import driver |
|
from nova.virt import event as virtevent |
|
from nova.virt import hardware |
|
from nova.virt import storage_users |
|
from nova.virt import virtapi |
|
from nova.volume import cinder |
|
|
|
CONF = nova.conf.CONF |
|
|
|
LOG = logging.getLogger(__name__) |
|
|
|
wrap_exception = functools.partial( |
|
exception_wrapper.wrap_exception, service='compute', binary='nova-compute') |
|
|
|
|
|
@contextlib.contextmanager |
|
def errors_out_migration_ctxt(migration): |
|
"""Context manager to error out migration on failure.""" |
|
|
|
try: |
|
yield |
|
except Exception: |
|
with excutils.save_and_reraise_exception(): |
|
if migration: |
|
# We may have been passed None for our migration if we're |
|
# receiving from an older client. The migration will be |
|
# errored via the legacy path. |
|
migration.status = 'error' |
|
try: |
|
migration.save() |
|
except Exception: |
|
LOG.debug( |
|
'Error setting migration status for instance %s.', |
|
migration.instance_uuid, exc_info=True) |
|
|
|
|
|
@utils.expects_func_args('migration') |
|
def errors_out_migration(function): |
|
"""Decorator to error out migration on failure.""" |
|
|
|
@functools.wraps(function) |
|
def decorated_function(self, context, *args, **kwargs): |
|
wrapped_func = safe_utils.get_wrapped_function(function) |
|
keyed_args = inspect.getcallargs(wrapped_func, self, context, |
|
*args, **kwargs) |
|
migration = keyed_args['migration'] |
|
with errors_out_migration_ctxt(migration): |
|
return function(self, context, *args, **kwargs) |
|
|
|
return decorated_function |
|
|
|
|
|
@utils.expects_func_args('instance') |
|
def reverts_task_state(function): |
|
"""Decorator to revert task_state on failure.""" |
|
|
|
@functools.wraps(function) |
|
def decorated_function(self, context, *args, **kwargs): |
|
try: |
|
return function(self, context, *args, **kwargs) |
|
except exception.UnexpectedTaskStateError as e: |
|
# Note(maoy): unexpected task state means the current |
|
# task is preempted. Do not clear task state in this |
|
# case. |
|
with excutils.save_and_reraise_exception(): |
|
LOG.info("Task possibly preempted: %s", |
|
e.format_message()) |
|
except Exception: |
|
with excutils.save_and_reraise_exception(): |
|
wrapped_func = safe_utils.get_wrapped_function(function) |
|
keyed_args = inspect.getcallargs(wrapped_func, self, context, |
|
*args, **kwargs) |
|
# NOTE(mriedem): 'instance' must be in keyed_args because we |
|
# have utils.expects_func_args('instance') decorating this |
|
# method. |
|
instance = keyed_args['instance'] |
|
original_task_state = instance.task_state |
|
try: |
|
self._instance_update(context, instance, task_state=None) |
|
LOG.info("Successfully reverted task state from %s on " |
|
"failure for instance.", |
|
original_task_state, instance=instance) |
|
except exception.InstanceNotFound: |
|
# We might delete an instance that failed to build shortly |
|
# after it errored out this is an expected case and we |
|
# should not trace on it. |
|
pass |
|
except Exception as e: |
|
LOG.warning("Failed to revert task state for instance. " |
|
"Error: %s", e, instance=instance) |
|
|
|
return decorated_function |
|
|
|
|
|
@utils.expects_func_args('instance') |
|
def wrap_instance_fault(function): |
|
"""Wraps a method to catch exceptions related to instances. |
|
|
|
This decorator wraps a method to catch any exceptions having to do with |
|
an instance that may get thrown. It then logs an instance fault in the db. |
|
""" |
|
|
|
@functools.wraps(function) |
|
def decorated_function(self, context, *args, **kwargs): |
|
try: |
|
return function(self, context, *args, **kwargs) |
|
except exception.InstanceNotFound: |
|
raise |
|
except Exception as e: |
|
# NOTE(gtt): If argument 'instance' is in args rather than kwargs, |
|
# we will get a KeyError exception which will cover up the real |
|
# exception. So, we update kwargs with the values from args first. |
|
# then, we can get 'instance' from kwargs easily. |
|
kwargs.update(dict(zip(function.__code__.co_varnames[2:], args))) |
|
|
|
with excutils.save_and_reraise_exception(): |
|
compute_utils.add_instance_fault_from_exc(context, |
|
kwargs['instance'], e, sys.exc_info()) |
|
|
|
return decorated_function |
|
|
|
|
|
@utils.expects_func_args('image_id', 'instance') |
|
def delete_image_on_error(function): |
|
"""Used for snapshot related method to ensure the image created in |
|
compute.api is deleted when an error occurs. |
|
""" |
|
|
|
@functools.wraps(function) |
|
def decorated_function(self, context, image_id, instance, |
|
*args, **kwargs): |
|
try: |
|
return function(self, context, image_id, instance, |
|
*args, **kwargs) |
|
except Exception: |
|
with excutils.save_and_reraise_exception(): |
|
compute_utils.delete_image( |
|
context, instance, self.image_api, image_id, |
|
log_exc_info=True) |
|
|
|
return decorated_function |
|
|
|
|
|
# Each collection of events is a dict of eventlet Events keyed by a tuple of |
|
# event name and associated tag |
|
_InstanceEvents = ty.Dict[ty.Tuple[str, str], eventlet.event.Event] |
|
|
|
|
|
class InstanceEvents(object): |
|
def __init__(self): |
|
self._events: ty.Optional[ty.Dict[str, _InstanceEvents]] = {} |
|
|
|
@staticmethod |
|
def _lock_name(instance) -> str: |
|
return '%s-%s' % (instance.uuid, 'events') |
|
|
|
def prepare_for_instance_event( |
|
self, |
|
instance: 'objects.Instance', |
|
name: str, |
|
tag: str, |
|
) -> eventlet.event.Event: |
|
"""Prepare to receive an event for an instance. |
|
|
|
This will register an event for the given instance that we will |
|
wait on later. This should be called before initiating whatever |
|
action will trigger the event. The resulting eventlet.event.Event |
|
object should be wait()'d on to ensure completion. |
|
|
|
:param instance: the instance for which the event will be generated |
|
:param name: the name of the event we're expecting |
|
:param tag: the tag associated with the event we're expecting |
|
:returns: an event object that should be wait()'d on |
|
""" |
|
@utils.synchronized(self._lock_name(instance)) |
|
def _create_or_get_event(): |
|
if self._events is None: |
|
# NOTE(danms): We really should have a more specific error |
|
# here, but this is what we use for our default error case |
|
raise exception.NovaException( |
|
'In shutdown, no new events can be scheduled') |
|
|
|
instance_events = self._events.setdefault(instance.uuid, {}) |
|
return instance_events.setdefault((name, tag), |
|
eventlet.event.Event()) |
|
LOG.debug('Preparing to wait for external event %(name)s-%(tag)s', |
|
{'name': name, 'tag': tag}, instance=instance) |
|
return _create_or_get_event() |
|
|
|
def pop_instance_event(self, instance, event): |
|
"""Remove a pending event from the wait list. |
|
|
|
This will remove a pending event from the wait list so that it |
|
can be used to signal the waiters to wake up. |
|
|
|
:param instance: the instance for which the event was generated |
|
:param event: the nova.objects.external_event.InstanceExternalEvent |
|
that describes the event |
|
:returns: the eventlet.event.Event object on which the waiters |
|
are blocked |
|
""" |
|
no_events_sentinel = object() |
|
no_matching_event_sentinel = object() |
|
|
|
@utils.synchronized(self._lock_name(instance)) |
|
def _pop_event(): |
|
if self._events is None: |
|
LOG.debug('Unexpected attempt to pop events during shutdown', |
|
instance=instance) |
|
return no_events_sentinel |
|
events = self._events.get(instance.uuid) |
|
if not events: |
|
return no_events_sentinel |
|
_event = events.pop((event.name, event.tag), None) |
|
if not events: |
|
del self._events[instance.uuid] |
|
if _event is None: |
|
return no_matching_event_sentinel |
|
return _event |
|
|
|
result = _pop_event() |
|
if result is no_events_sentinel: |
|
LOG.debug('No waiting events found dispatching %(event)s', |
|
{'event': event.key}, |
|
instance=instance) |
|
return None |
|
elif result is no_matching_event_sentinel: |
|
LOG.debug( |
|
'No event matching %(event)s in %(events)s', |
|
{ |
|
'event': event.key, |
|
# mypy can't identify the none check in _pop_event |
|
'events': self._events.get( # type: ignore |
|
instance.uuid, {}).keys(), |
|
}, |
|
instance=instance, |
|
) |
|
return None |
|
else: |
|
return result |
|
|
|
def clear_events_for_instance(self, instance): |
|
"""Remove all pending events for an instance. |
|
|
|
This will remove all events currently pending for an instance |
|
and return them (indexed by event name). |
|
|
|
:param instance: the instance for which events should be purged |
|
:returns: a dictionary of {event_name: eventlet.event.Event} |
|
""" |
|
@utils.synchronized(self._lock_name(instance)) |
|
def _clear_events(): |
|
if self._events is None: |
|
LOG.debug('Unexpected attempt to clear events during shutdown', |
|
instance=instance) |
|
return dict() |
|
# NOTE(danms): We have historically returned the raw internal |
|
# format here, which is {event.key: [events, ...])} so just |
|
# trivially convert it here. |
|
return {'%s-%s' % k: e |
|
for k, e in self._events.pop(instance.uuid, {}).items()} |
|
return _clear_events() |
|
|
|
def cancel_all_events(self): |
|
if self._events is None: |
|
LOG.debug('Unexpected attempt to cancel events during shutdown.') |
|
return |
|
our_events = self._events |
|
# NOTE(danms): Block new events |
|
self._events = None |
|
|
|
for instance_uuid, events in our_events.items(): |
|
for (name, tag), eventlet_event in events.items(): |
|
LOG.debug('Canceling in-flight event %(name)s-%(tag)s for ' |
|
'instance %(instance_uuid)s', |
|
{'name': name, |
|
'tag': tag, |
|
'instance_uuid': instance_uuid}) |
|
event = objects.InstanceExternalEvent( |
|
instance_uuid=instance_uuid, |
|
name=name, status='failed', |
|
tag=tag, data={}) |
|
eventlet_event.send(event) |
|
|
|
|
|
class ComputeVirtAPI(virtapi.VirtAPI): |
|
def __init__(self, compute): |
|
super(ComputeVirtAPI, self).__init__() |
|
self._compute = compute |
|
self.reportclient = compute.reportclient |
|
|
|
class ExitEarly(Exception): |
|
def __init__(self, events): |
|
super(Exception, self).__init__() |
|
self.events = events |
|
|
|
self._exit_early_exc = ExitEarly |
|
|
|
def exit_wait_early(self, events): |
|
"""Exit a wait_for_instance_event() immediately and avoid |
|
waiting for some events. |
|
|
|
:param: events: A list of (name, tag) tuples for events that we should |
|
skip waiting for during a wait_for_instance_event(). |
|
""" |
|
raise self._exit_early_exc(events=events) |
|
|
|
def _default_error_callback(self, event_name, instance): |
|
raise exception.NovaException(_('Instance event failed')) |
|
|
|
class _InstanceEvent: |
|
EXPECTED = "expected" |
|
WAITING = "waiting" |
|
RECEIVED = "received" |
|
RECEIVED_EARLY = "received early" |
|
TIMED_OUT = "timed out" |
|
RECEIVED_NOT_PROCESSED = "received but not processed" |
|
|
|
def __init__(self, name: str, event: eventlet.event.Event) -> None: |
|
self.name = name |
|
self.event = event |
|
self.status = self.EXPECTED |
|
self.wait_time = None |
|
|
|
def mark_as_received_early(self) -> None: |
|
self.status = self.RECEIVED_EARLY |
|
|
|
def is_received_early(self) -> bool: |
|
return self.status == self.RECEIVED_EARLY |
|
|
|
def _update_status_no_wait(self): |
|
if self.status == self.EXPECTED and self.event.ready(): |
|
self.status = self.RECEIVED_NOT_PROCESSED |
|
|
|
def wait(self) -> 'objects.InstanceExternalEvent': |
|
self.status = self.WAITING |
|
try: |
|
with timeutils.StopWatch() as sw: |
|
instance_event = self.event.wait() |
|
except eventlet.timeout.Timeout: |
|
self.status = self.TIMED_OUT |
|
self.wait_time = sw.elapsed() |
|
|
|
raise |
|
|
|
self.status = self.RECEIVED |
|
self.wait_time = sw.elapsed() |
|
return instance_event |
|
|
|
def __str__(self) -> str: |
|
self._update_status_no_wait() |
|
if self.status == self.EXPECTED: |
|
return f"{self.name}: expected but not received" |
|
if self.status == self.RECEIVED: |
|
return ( |
|
f"{self.name}: received after waiting " |
|
f"{self.wait_time:.2f} seconds") |
|
if self.status == self.TIMED_OUT: |
|
return ( |
|
f"{self.name}: timed out after " |
|
f"{self.wait_time:.2f} seconds") |
|
return f"{self.name}: {self.status}" |
|
|
|
@staticmethod |
|
def _wait_for_instance_events( |
|
instance: 'objects.Instance', |
|
events: dict, |
|
error_callback: ty.Callable, |
|
) -> None: |
|
for event_name, event in events.items(): |
|
if event.is_received_early(): |
|
continue |
|
else: |
|
actual_event = event.wait() |
|
if actual_event.status == 'completed': |
|
continue |
|
# If we get here, we have an event that was not completed, |
|
# nor skipped via exit_wait_early(). Decide whether to |
|
# keep waiting by calling the error_callback() hook. |
|
decision = error_callback(event_name, instance) |
|
if decision is False: |
|
break |
|
|
|
@contextlib.contextmanager |
|
def wait_for_instance_event(self, instance, event_names, deadline=300, |
|
error_callback=None): |
|
"""Plan to wait for some events, run some code, then wait. |
|
|
|
This context manager will first create plans to wait for the |
|
provided event_names, yield, and then wait for all the scheduled |
|
events to complete. |
|
|
|
Note that this uses an eventlet.timeout.Timeout to bound the |
|
operation, so callers should be prepared to catch that |
|
failure and handle that situation appropriately. |
|
|
|
If the event is not received by the specified timeout deadline, |
|
eventlet.timeout.Timeout is raised. |
|
|
|
If the event is received but did not have a 'completed' |
|
status, a NovaException is raised. If an error_callback is |
|
provided, instead of raising an exception as detailed above |
|
for the failure case, the callback will be called with the |
|
event_name and instance, and can return True to continue |
|
waiting for the rest of the events, False to stop processing, |
|
or raise an exception which will bubble up to the waiter. |
|
|
|
If the inner code wishes to abort waiting for one or more |
|
events because it knows some state to be finished or condition |
|
to be satisfied, it can use VirtAPI.exit_wait_early() with a |
|
list of event (name,tag) items to avoid waiting for those |
|
events upon context exit. Note that exit_wait_early() exits |
|
the context immediately and should be used to signal that all |
|
work has been completed and provide the unified list of events |
|
that need not be waited for. Waiting for the remaining events |
|
will begin immediately upon early exit as if the context was |
|
exited normally. |
|
|
|
:param instance: The instance for which an event is expected |
|
:param event_names: A list of event names. Each element is a |
|
tuple of strings to indicate (name, tag), |
|
where name is required, but tag may be None. |
|
:param deadline: Maximum number of seconds we should wait for all |
|
of the specified events to arrive. |
|
:param error_callback: A function to be called if an event arrives |
|
|
|
""" |
|
|
|
if error_callback is None: |
|
error_callback = self._default_error_callback |
|
events = {} |
|
for event_name in event_names: |
|
name, tag = event_name |
|
event_name = objects.InstanceExternalEvent.make_key(name, tag) |
|
try: |
|
event = ( |
|
self._compute.instance_events.prepare_for_instance_event( |
|
instance, name, tag)) |
|
events[event_name] = self._InstanceEvent(event_name, event) |
|
except exception.NovaException: |
|
error_callback(event_name, instance) |
|
# NOTE(danms): Don't wait for any of the events. They |
|
# should all be canceled and fired immediately below, |
|
# but don't stick around if not. |
|
deadline = 0 |
|
try: |
|
yield |
|
except self._exit_early_exc as e: |
|
early_events = set([objects.InstanceExternalEvent.make_key(n, t) |
|
for n, t in e.events]) |
|
|
|
# If there are expected events that received early, mark them, |
|
# so they won't be waited for later |
|
for early_event_name in early_events: |
|
if early_event_name in events: |
|
events[early_event_name].mark_as_received_early() |
|
|
|
sw = timeutils.StopWatch() |
|
sw.start() |
|
try: |
|
with eventlet.timeout.Timeout(deadline): |
|
self._wait_for_instance_events( |
|
instance, events, error_callback) |
|
except eventlet.timeout.Timeout: |
|
LOG.warning( |
|
'Timeout waiting for %(events)s for instance with ' |
|
'vm_state %(vm_state)s and task_state %(task_state)s. ' |
|
'Event states are: %(event_states)s', |
|
{ |
|
'events': list(events.keys()), |
|
'vm_state': instance.vm_state, |
|
'task_state': instance.task_state, |
|
'event_states': |
|
', '.join([str(event) for event in events.values()]), |
|
}, |
|
instance=instance) |
|
|
|
raise |
|
|
|
LOG.debug('Instance event wait completed in %i seconds for %s', |
|
sw.elapsed(), |
|
','.join(x[0] for x in event_names), |
|
instance=instance) |
|
|
|
def update_compute_provider_status(self, context, rp_uuid, enabled): |
|
"""Used to add/remove the COMPUTE_STATUS_DISABLED trait on the provider |
|
|
|
:param context: nova auth RequestContext |
|
:param rp_uuid: UUID of a compute node resource provider in Placement |
|
:param enabled: True if the node is enabled in which case the trait |
|
would be removed, False if the node is disabled in which case |
|
the trait would be added. |
|
:raises: ResourceProviderTraitRetrievalFailed |
|
:raises: ResourceProviderUpdateConflict |
|
:raises: ResourceProviderUpdateFailed |
|
:raises: TraitRetrievalFailed |
|
:raises: keystoneauth1.exceptions.ClientException |
|
""" |
|
trait_name = os_traits.COMPUTE_STATUS_DISABLED |
|
# Get the current traits (and generation) for the provider. |
|
# TODO(mriedem): Leverage the ProviderTree cache in get_provider_traits |
|
trait_info = self.reportclient.get_provider_traits(context, rp_uuid) |
|
# If the host is enabled, remove the trait (if set), else add |
|
# the trait if it doesn't already exist. |
|
original_traits = trait_info.traits |
|
new_traits = None |
|
if enabled and trait_name in original_traits: |
|
new_traits = original_traits - {trait_name} |
|
LOG.debug('Removing trait %s from compute node resource ' |
|
'provider %s in placement.', trait_name, rp_uuid) |
|
elif not enabled and trait_name not in original_traits: |
|
new_traits = original_traits | {trait_name} |
|
LOG.debug('Adding trait %s to compute node resource ' |
|
'provider %s in placement.', trait_name, rp_uuid) |
|
|
|
if new_traits is not None: |
|
self.reportclient.set_traits_for_provider( |
|
context, rp_uuid, new_traits, generation=trait_info.generation) |
|
|
|
|
|
class ComputeManager(manager.Manager): |
|
"""Manages the running instances from creation to destruction.""" |
|
|
|
target = messaging.Target(version='6.0') |
|
|
|
def __init__(self, compute_driver=None, *args, **kwargs): |
|
"""Load configuration options and connect to the hypervisor.""" |
|
# We want the ComputeManager, ResourceTracker and ComputeVirtAPI all |
|
# using the same instance of SchedulerReportClient which has the |
|
# ProviderTree cache for this compute service. |
|
self.reportclient = report.SchedulerReportClient() |
|
self.virtapi = ComputeVirtAPI(self) |
|
self.network_api = neutron.API() |
|
self.volume_api = cinder.API() |
|
self.image_api = glance.API() |
|
self._last_bw_usage_poll = 0.0 |
|
self.compute_api = compute.API() |
|
self.compute_rpcapi = compute_rpcapi.ComputeAPI() |
|
self.compute_task_api = conductor.ComputeTaskAPI() |
|
self.query_client = query.SchedulerQueryClient() |
|
self.instance_events = InstanceEvents() |
|
self._sync_power_pool = eventlet.GreenPool( |
|
size=CONF.sync_power_state_pool_size) |
|
self._syncs_in_progress = {} |
|
self.send_instance_updates = ( |
|
CONF.filter_scheduler.track_instance_changes) |
|
if CONF.max_concurrent_builds != 0: |
|
self._build_semaphore = eventlet.semaphore.Semaphore( |
|
CONF.max_concurrent_builds) |
|
else: |
|
self._build_semaphore = compute_utils.UnlimitedSemaphore() |
|
if CONF.max_concurrent_snapshots > 0: |
|
self._snapshot_semaphore = eventlet.semaphore.Semaphore( |
|
CONF.max_concurrent_snapshots) |
|
else: |
|
self._snapshot_semaphore = compute_utils.UnlimitedSemaphore() |
|
if CONF.max_concurrent_live_migrations > 0: |
|
self._live_migration_executor = futurist.GreenThreadPoolExecutor( |
|
max_workers=CONF.max_concurrent_live_migrations) |
|
else: |
|
# CONF.max_concurrent_live_migrations is 0 (unlimited) |
|
self._live_migration_executor = futurist.GreenThreadPoolExecutor() |
|
# This is a dict, keyed by instance uuid, to a two-item tuple of |
|
# migration object and Future for the queued live migration. |
|
self._waiting_live_migrations = {} |
|
|
|
super(ComputeManager, self).__init__(service_name="compute", |
|
*args, **kwargs) |
|
|
|
# TODO(sbauza): Remove this call once we delete the V5Proxy class |
|
self.additional_endpoints.append(_ComputeV5Proxy(self)) |
|
|
|
# NOTE(russellb) Load the driver last. It may call back into the |
|
# compute manager via the virtapi, so we want it to be fully |
|
# initialized before that happens. |
|
self.driver = driver.load_compute_driver(self.virtapi, compute_driver) |
|
self.rt = resource_tracker.ResourceTracker( |
|
self.host, self.driver, reportclient=self.reportclient) |
|
|
|
def reset(self): |
|
LOG.info('Reloading compute RPC API') |
|
compute_rpcapi.reset_globals() |
|
self.compute_rpcapi = compute_rpcapi.ComputeAPI() |
|
self.reportclient.clear_provider_cache() |
|
|
|
def _update_resource_tracker(self, context, instance): |
|
"""Let the resource tracker know that an instance has changed state.""" |
|
|
|
if instance.host == self.host: |
|
self.rt.update_usage(context, instance, instance.node) |
|
|
|
def _instance_update(self, context, instance, **kwargs): |
|
"""Update an instance in the database using kwargs as value.""" |
|
|
|
for k, v in kwargs.items(): |
|
setattr(instance, k, v) |
|
instance.save() |
|
self._update_resource_tracker(context, instance) |
|
|
|
def _nil_out_instance_obj_host_and_node(self, instance): |
|
# NOTE(jwcroppe): We don't do instance.save() here for performance |
|
# reasons; a call to this is expected to be immediately followed by |
|
# another call that does instance.save(), thus avoiding two writes |
|
# to the database layer. |
|
instance.host = None |
|
instance.node = None |
|
# ResourceTracker._set_instance_host_and_node also sets launched_on |
|
# to the same value as host and is really only ever used by legacy |
|
# nova-network code, but we should also null it out to avoid confusion |
|
# if there is an instance in the database with no host set but |
|
# launched_on is set. Note that we do not care about using launched_on |
|
# as some kind of debug helper if diagnosing a build failure, that is |
|
# what instance action events are for. |
|
instance.launched_on = None |
|
# If the instance is not on a host, it's not in an aggregate and |
|
# therefore is not in an availability zone. |
|
instance.availability_zone = None |
|
|
|
def _set_instance_obj_error_state(self, instance, clean_task_state=False): |
|
try: |
|
instance.vm_state = vm_states.ERROR |
|
if clean_task_state: |
|
instance.task_state = None |
|
instance.save() |
|
except exception.InstanceNotFound: |
|
LOG.debug('Instance has been destroyed from under us while ' |
|
'trying to set it to ERROR', instance=instance) |
|
|
|
def _get_instances_on_driver(self, context, filters=None): |
|
"""Return a list of instance records for the instances found |
|
on the hypervisor which satisfy the specified filters. If filters=None |
|
return a list of instance records for all the instances found on the |
|
hypervisor. |
|
""" |
|
if not filters: |
|
filters = {} |
|
try: |
|
driver_uuids = self.driver.list_instance_uuids() |
|
if len(driver_uuids) == 0: |
|
# Short circuit, don't waste a DB call |
|
return objects.InstanceList() |
|
filters['uuid'] = driver_uuids |
|
local_instances = objects.InstanceList.get_by_filters( |
|
context, filters, use_slave=True) |
|
return local_instances |
|
except NotImplementedError: |
|
pass |
|
|
|
# The driver doesn't support uuids listing, so we'll have |
|
# to brute force. |
|
driver_instances = self.driver.list_instances() |
|
# NOTE(mjozefcz): In this case we need to apply host filter. |
|
# Without this all instance data would be fetched from db. |
|
filters['host'] = self.host |
|
instances = objects.InstanceList.get_by_filters(context, filters, |
|
use_slave=True) |
|
name_map = {instance.name: instance for instance in instances} |
|
local_instances = [] |
|
for driver_instance in driver_instances: |
|
instance = name_map.get(driver_instance) |
|
if not instance: |
|
continue |
|
local_instances.append(instance) |
|
return local_instances |
|
|
|
def _destroy_evacuated_instances(self, context, node_cache): |
|
"""Destroys evacuated instances. |
|
|
|
While nova-compute was down, the instances running on it could be |
|
evacuated to another host. This method looks for evacuation migration |
|
records where this is the source host and which were either started |
|
(accepted), in-progress (pre-migrating) or migrated (done). From those |
|
migration records, local instances reported by the hypervisor are |
|
compared to the instances for the migration records and those local |
|
guests are destroyed, along with instance allocation records in |
|
Placement for this node. |
|
Then allocations are removed from Placement for every instance that is |
|
evacuated from this host regardless if the instance is reported by the |
|
hypervisor or not. |
|
|
|
:param context: The request context |
|
:param node_cache: A dict of ComputeNode objects keyed by the UUID of |
|
the compute node |
|
:return: A dict keyed by instance uuid mapped to Migration objects |
|
for instances that were migrated away from this host |
|
""" |
|
filters = { |
|
'source_compute': self.host, |
|
# NOTE(mriedem): Migration records that have been accepted are |
|
# included in case the source node comes back up while instances |
|
# are being evacuated to another host. We don't want the same |
|
# instance being reported from multiple hosts. |
|
# NOTE(lyarwood): pre-migrating is also included here as the |
|
# source compute can come back online shortly after the RT |
|
# claims on the destination that in-turn moves the migration to |
|
# pre-migrating. If the evacuate fails on the destination host, |
|
# the user can rebuild the instance (in ERROR state) on the source |
|
# host. |
|
'status': ['accepted', 'pre-migrating', 'done'], |
|
'migration_type': fields.MigrationType.EVACUATION, |
|
} |
|
with utils.temporary_mutation(context, read_deleted='yes'): |
|
evacuations = objects.MigrationList.get_by_filters(context, |
|
filters) |
|
if not evacuations: |
|
return {} |
|
evacuations = {mig.instance_uuid: mig for mig in evacuations} |
|
|
|
# TODO(mriedem): We could optimize by pre-loading the joined fields |
|
# we know we'll use, like info_cache and flavor. |
|
local_instances = self._get_instances_on_driver(context) |
|
evacuated_local_instances = {inst.uuid: inst |
|
for inst in local_instances |
|
if inst.uuid in evacuations} |
|
|
|
for instance in evacuated_local_instances.values(): |
|
LOG.info('Destroying instance as it has been evacuated from ' |
|
'this host but still exists in the hypervisor', |
|
instance=instance) |
|
try: |
|
network_info = self.network_api.get_instance_nw_info( |
|
context, instance) |
|
bdi = self._get_instance_block_device_info(context, |
|
instance) |
|
destroy_disks = not (self._is_instance_storage_shared( |
|
context, instance)) |
|
except exception.InstanceNotFound: |
|
network_info = network_model.NetworkInfo() |
|
bdi = {} |
|
LOG.info('Instance has been marked deleted already, ' |
|
'removing it from the hypervisor.', |
|
instance=instance) |
|
# always destroy disks if the instance was deleted |
|
destroy_disks = True |
|
self.driver.destroy(context, instance, |
|
network_info, |
|
bdi, destroy_disks) |
|
|
|
hostname_to_cn_uuid = { |
|
cn.hypervisor_hostname: cn.uuid |
|
for cn in node_cache.values()} |
|
|
|
for instance_uuid, migration in evacuations.items(): |
|
try: |
|
if instance_uuid in evacuated_local_instances: |
|
# Avoid the db call if we already have the instance loaded |
|
# above |
|
instance = evacuated_local_instances[instance_uuid] |
|
else: |
|
instance = objects.Instance.get_by_uuid( |
|
context, instance_uuid) |
|
except exception.InstanceNotFound: |
|
# The instance already deleted so we expect that every |
|
# allocation of that instance has already been cleaned up |
|
continue |
|
|
|
LOG.info('Cleaning up allocations of the instance as it has been ' |
|
'evacuated from this host', |
|
instance=instance) |
|
if migration.source_node not in hostname_to_cn_uuid: |
|
LOG.error("Failed to clean allocation of evacuated " |
|
"instance as the source node %s is not found", |
|
migration.source_node, instance=instance) |
|
continue |
|
cn_uuid = hostname_to_cn_uuid[migration.source_node] |
|
|
|
# If the instance was deleted in the interim, assume its |
|
# allocations were properly cleaned up (either by its hosting |
|
# compute service or the API). |
|
if (not instance.deleted and |
|
not self.reportclient. |
|
remove_provider_tree_from_instance_allocation( |
|
context, instance.uuid, cn_uuid)): |
|
LOG.error("Failed to clean allocation of evacuated instance " |
|
"on the source node %s", |
|
cn_uuid, instance=instance) |
|
|
|
migration.status = 'completed' |
|
migration.save() |
|
return evacuations |
|
|
|
def _is_instance_storage_shared(self, context, instance, host=None): |
|
shared_storage = True |
|
data = None |
|
try: |
|
data = self.driver.check_instance_shared_storage_local(context, |
|
instance) |
|
if data: |
|
shared_storage = (self.compute_rpcapi. |
|
check_instance_shared_storage(context, |
|
data, instance=instance, host=host)) |
|
except NotImplementedError: |
|
LOG.debug('Hypervisor driver does not support ' |
|
'instance shared storage check, ' |
|
'assuming it\'s not on shared storage', |
|
instance=instance) |
|
shared_storage = False |
|
except Exception: |
|
LOG.exception('Failed to check if instance shared', |
|
instance=instance) |
|
finally: |
|
if data: |
|
self.driver.check_instance_shared_storage_cleanup(context, |
|
data) |
|
return shared_storage |
|
|
|
def _complete_partial_deletion(self, context, instance): |
|
"""Complete deletion for instances in DELETED status but not marked as |
|
deleted in the DB |
|
""" |
|
instance.destroy() |
|
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid( |
|
context, instance.uuid) |
|
self._complete_deletion(context, |
|
instance) |
|
self._notify_about_instance_usage(context, instance, "delete.end") |
|
compute_utils.notify_about_instance_action(context, instance, |
|
self.host, action=fields.NotificationAction.DELETE, |
|
phase=fields.NotificationPhase.END, bdms=bdms) |
|
|
|
def _complete_deletion(self, context, instance): |
|
self._update_resource_tracker(context, instance) |
|
|
|
# If we're configured to do deferred deletes, don't force deletion of |
|
# allocations if there's a conflict. |
|
force = False if CONF.reclaim_instance_interval > 0 else True |
|
|
|
self.reportclient.delete_allocation_for_instance(context, |
|
instance.uuid, |
|
force=force) |
|
|
|
self._clean_instance_console_tokens(context, instance) |
|
self._delete_scheduler_instance_info(context, instance.uuid) |
|
|
|
def _validate_pinning_configuration(self, instances): |
|
if not self.driver.capabilities.get('supports_pcpus', False): |
|
return |
|
|
|
for instance in instances: |
|
# ignore deleted instances |
|
if instance.deleted: |
|
continue |
|
|
|
# if this is an unpinned instance and the host only has |
|
# 'cpu_dedicated_set' configured, we need to tell the operator to |
|
# correct their configuration |
|
if not instance.numa_topology or ( |
|
instance.numa_topology.cpu_policy in ( |
|
None, fields.CPUAllocationPolicy.SHARED |
|
) |
|
): |
|
# we don't need to check 'vcpu_pin_set' since it can't coexist |
|
# alongside 'cpu_dedicated_set' |
|
if (CONF.compute.cpu_dedicated_set and |
|
not CONF.compute.cpu_shared_set): |
|
msg = _("This host has unpinned instances but has no CPUs " |
|
"set aside for this purpose; configure '[compute] " |
|
"cpu_shared_set' instead of, or in addition to, " |
|
"'[compute] cpu_dedicated_set'") |
|
raise exception.InvalidConfiguration(msg) |
|
|
|
continue |
|
|
|
# ditto for pinned instances if only 'cpu_shared_set' is configured |
|
if (CONF.compute.cpu_shared_set and |
|
not CONF.compute.cpu_dedicated_set and |
|
not CONF.vcpu_pin_set): |
|
msg = _("This host has pinned instances but has no CPUs " |
|
"set aside for this purpose; configure '[compute] " |
|
"cpu_dedicated_set' instead of, or in addition to, " |
|
"'[compute] cpu_shared_set'.") |
|
raise exception.InvalidConfiguration(msg) |
|
|
|
# if this is a mixed instance with both pinned and unpinned CPUs, |
|
# the host must have both 'cpu_dedicated_set' and 'cpu_shared_set' |
|
# configured. check if 'cpu_shared_set' is set. |
|
if (instance.numa_topology.cpu_policy == |
|
fields.CPUAllocationPolicy.MIXED and |
|
not CONF.compute.cpu_shared_set): |
|
msg = _("This host has mixed instance requesting both pinned " |
|
"and unpinned CPUs but hasn't set aside unpinned CPUs " |
|
"for this purpose; Configure " |
|
"'[compute] cpu_shared_set'.") |
|
raise exception.InvalidConfiguration(msg) |
|
|
|
# for mixed instance check if 'cpu_dedicated_set' is set. |
|
if (instance.numa_topology.cpu_policy == |
|
fields.CPUAllocationPolicy.MIXED and |
|
not CONF.compute.cpu_dedicated_set): |
|
msg = _("This host has mixed instance requesting both pinned " |
|
"and unpinned CPUs but hasn't set aside pinned CPUs " |
|
"for this purpose; Configure " |
|
"'[compute] cpu_dedicated_set'") |
|
raise exception.InvalidConfiguration(msg) |
|
|
|
# also check to make sure the operator hasn't accidentally |
|
# dropped some cores that instances are currently using |
|
available_dedicated_cpus = (hardware.get_vcpu_pin_set() or |
|
hardware.get_cpu_dedicated_set()) |
|
pinned_cpus = instance.numa_topology.cpu_pinning |
|
if available_dedicated_cpus and ( |
|
pinned_cpus - available_dedicated_cpus): |
|
# we can't raise an exception because of bug #1289064, |
|
# which meant we didn't recalculate CPU pinning information |
|
# when we live migrated a pinned instance |
|
LOG.warning( |
|
"Instance is pinned to host CPUs %(cpus)s " |
|
"but one or more of these CPUs are not included in " |
|
"either '[compute] cpu_dedicated_set' or " |
|
"'vcpu_pin_set'; you should update these " |
|
"configuration options to include the missing CPUs " |
|
"or rebuild or cold migrate this instance.", |
|
{'cpus': list(pinned_cpus)}, |
|
instance=instance) |
|
|
|
def _validate_vtpm_configuration(self, instances): |
|
if self.driver.capabilities.get('supports_vtpm', False): |
|
return |
|
|
|
for instance in instances: |
|
if instance.deleted: |
|
continue |
|
|
|
# NOTE(stephenfin): We don't have an attribute on the instance to |
|
# check for this, so we need to inspect the flavor/image metadata |
|
if hardware.get_vtpm_constraint( |
|
instance.flavor, instance.image_meta, |
|
): |
|
msg = _( |
|
'This host has instances with the vTPM feature enabled, ' |
|
'but the host is not correctly configured; enable ' |
|
'vTPM support.' |
|
) |
|
raise exception.InvalidConfiguration(msg) |
|
|
|
def _reset_live_migration(self, context, instance): |
|
migration = None |
|
try: |
|
migration = objects.Migration.get_by_instance_and_status( |
|
context, instance.uuid, 'running') |
|
if migration: |
|
self.live_migration_abort(context, instance, migration.id) |
|
except Exception: |
|
LOG.exception('Failed to abort live-migration', |
|
instance=instance) |
|
finally: |
|
if migration: |
|
self._set_migration_status(migration, 'error') |
|
LOG.info('Instance found in migrating state during ' |
|
'startup. Resetting task_state', |
|
instance=instance) |
|
instance.task_state = None |
|
instance.save(expected_task_state=[task_states.MIGRATING]) |
|
|
|
def _init_instance(self, context, instance): |
|
"""Initialize this instance during service init.""" |
|
|
|
# NOTE(danms): If the instance appears to not be owned by this |
|
# host, it may have been evacuated away, but skipped by the |
|
# evacuation cleanup code due to configuration. Thus, if that |
|
# is a possibility, don't touch the instance in any way, but |
|
# log the concern. This will help avoid potential issues on |
|
# startup due to misconfiguration. |
|
if instance.host != self.host: |
|
LOG.warning('Instance %(uuid)s appears to not be owned ' |
|
'by this host, but by %(host)s. Startup ' |
|
'processing is being skipped.', |
|
{'uuid': instance.uuid, |
|
'host': instance.host}) |
|
return |
|
|
|
# Instances that are shut down, or in an error state can not be |
|
# initialized and are not attempted to be recovered. The exception |
|
# to this are instances that are in RESIZE_MIGRATING or DELETING, |
|
# which are dealt with further down. |
|
if (instance.vm_state == vm_states.SOFT_DELETED or |
|
(instance.vm_state == vm_states.ERROR and |
|
instance.task_state not in |
|
(task_states.RESIZE_MIGRATING, task_states.DELETING))): |
|
LOG.debug("Instance is in %s state.", |
|
instance.vm_state, instance=instance) |
|
return |
|
|
|
if instance.vm_state == vm_states.DELETED: |
|
try: |
|
self._complete_partial_deletion(context, instance) |
|
except Exception: |
|
# we don't want that an exception blocks the init_host |
|
LOG.exception('Failed to complete a deletion', |
|
instance=instance) |
|
return |
|
|
|
if (instance.vm_state == vm_states.BUILDING or |
|
instance.task_state in [task_states.SCHEDULING, |
|
task_states.BLOCK_DEVICE_MAPPING, |
|
task_states.NETWORKING, |
|
task_states.SPAWNING]): |
|
# NOTE(dave-mcnally) compute stopped before instance was fully |
|
# spawned so set to ERROR state. This is safe to do as the state |
|
# may be set by the api but the host is not so if we get here the |
|
# instance has already been scheduled to this particular host. |
|
LOG.debug("Instance failed to spawn correctly, " |
|
"setting to ERROR state", instance=instance) |
|
self._set_instance_obj_error_state(instance, clean_task_state=True) |
|
return |
|
|
|
if (instance.vm_state in [vm_states.ACTIVE, vm_states.STOPPED] and |
|
instance.task_state in [task_states.REBUILDING, |
|
task_states.REBUILD_BLOCK_DEVICE_MAPPING, |
|
task_states.REBUILD_SPAWNING]): |
|
# NOTE(jichenjc) compute stopped before instance was fully |
|
# spawned so set to ERROR state. This is consistent to BUILD |
|
LOG.debug("Instance failed to rebuild correctly, " |
|
"setting to ERROR state", instance=instance) |
|
self._set_instance_obj_error_state(instance, clean_task_state=True) |
|
return |
|
|
|
if (instance.vm_state != vm_states.ERROR and |
|
instance.task_state in [task_states.IMAGE_SNAPSHOT_PENDING, |
|
task_states.IMAGE_PENDING_UPLOAD, |
|
task_states.IMAGE_UPLOADING, |
|
task_states.IMAGE_SNAPSHOT]): |
|
LOG.debug("Instance in transitional state %s at start-up " |
|
"clearing task state", |
|
instance.task_state, instance=instance) |
|
instance.task_state = None |
|
instance.save() |
|
|
|
if (instance.vm_state != vm_states.ERROR and |
|
instance.task_state in [task_states.RESIZE_PREP]): |
|
LOG.debug("Instance in transitional state %s at start-up " |
|
"clearing task state", |
|
instance['task_state'], instance=instance) |
|
instance.task_state = None |
|
instance.save() |
|
|
|
if instance.task_state == task_states.DELETING: |
|
try: |
|
LOG.info('Service started deleting the instance during ' |
|
'the previous run, but did not finish. Restarting' |
|
' the deletion now.', instance=instance) |
|
instance.obj_load_attr('metadata') |
|
instance.obj_load_attr('system_metadata') |
|
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid( |
|
context, instance.uuid) |
|
self._delete_instance(context, instance, bdms) |
|
except Exception: |
|
# we don't want that an exception blocks the init_host |
|
LOG.exception('Failed to complete a deletion', |
|
instance=instance) |
|
self._set_instance_obj_error_state(instance) |
|
return |
|
|
|
current_power_state = self._get_power_state(instance) |
|
try_reboot, reboot_type = self._retry_reboot( |
|
instance, current_power_state) |
|
|
|
if try_reboot: |
|
LOG.debug("Instance in transitional state (%(task_state)s) at " |
|
"start-up and power state is (%(power_state)s), " |
|
"triggering reboot", |
|
{'task_state': instance.task_state, |
|
'power_state': current_power_state}, |
|
instance=instance) |
|
|
|
# NOTE(mikal): if the instance was doing a soft reboot that got as |
|
# far as shutting down the instance but not as far as starting it |
|
# again, then we've just become a hard reboot. That means the |
|
# task state for the instance needs to change so that we're in one |
|
# of the expected task states for a hard reboot. |
|
if (instance.task_state in task_states.soft_reboot_states and |
|
reboot_type == 'HARD'): |
|
instance.task_state = task_states.REBOOT_PENDING_HARD |
|
instance.save() |
|
|
|
self.reboot_instance(context, instance, block_device_info=None, |
|
reboot_type=reboot_type) |
|
return |
|
|
|
elif (current_power_state == power_state.RUNNING and |
|
instance.task_state in [task_states.REBOOT_STARTED, |
|
task_states.REBOOT_STARTED_HARD, |
|
task_states.PAUSING, |
|
task_states.UNPAUSING]): |
|
LOG.warning("Instance in transitional state " |
|
"(%(task_state)s) at start-up and power state " |
|
"is (%(power_state)s), clearing task state", |
|
{'task_state': instance.task_state, |
|
'power_state': current_power_state}, |
|
instance=instance) |
|
instance.task_state = None |
|
instance.vm_state = vm_states.ACTIVE |
|
instance.save() |
|
elif (current_power_state == power_state.PAUSED and |
|
instance.task_state == task_states.UNPAUSING): |
|
LOG.warning("Instance in transitional state " |
|
"(%(task_state)s) at start-up and power state " |
|
"is (%(power_state)s), clearing task state " |
|
"and unpausing the instance", |
|
{'task_state': instance.task_state, |
|
'power_state': current_power_state}, |
|
instance=instance) |
|
try: |
|
self.unpause_instance(context, instance) |
|
except NotImplementedError: |
|
# Some virt driver didn't support pause and unpause |
|
pass |
|
except Exception: |
|
LOG.exception('Failed to unpause instance', instance=instance) |
|
return |
|
|
|
if instance.task_state == task_states.POWERING_OFF: |
|
try: |
|
LOG.debug("Instance in transitional state %s at start-up " |
|
"retrying stop request", |
|
instance.task_state, instance=instance) |
|
self.stop_instance(context, instance, True) |
|
except Exception: |
|
# we don't want that an exception blocks the init_host |
|
LOG.exception('Failed to stop instance', instance=instance) |
|
return |
|
|
|
if instance.task_state == task_states.POWERING_ON: |
|
try: |
|
LOG.debug("Instance in transitional state %s at start-up " |
|
"retrying start request", |
|
instance.task_state, instance=instance) |
|
self.start_instance(context, instance) |
|
except Exception: |
|
# we don't want that an exception blocks the init_host |
|
LOG.exception('Failed to start instance', instance=instance) |
|
return |
|
|
|
net_info = instance.get_network_info() |
|
try: |
|
self.driver.plug_vifs(instance, net_info) |
|
except NotImplementedError as e: |
|
LOG.debug(e, instance=instance) |
|
except exception.VirtualInterfacePlugException: |
|
# NOTE(mriedem): If we get here, it could be because the vif_type |
|
# in the cache is "binding_failed" or "unbound". |
|
# The periodic task _heal_instance_info_cache checks for this |
|
# condition. It should fix this by binding the ports again when |
|
# it gets to this instance. |
|
LOG.exception('Virtual interface plugging failed for instance. ' |
|
'The port binding:host_id may need to be manually ' |
|
'updated.', instance=instance) |
|
self._set_instance_obj_error_state(instance) |
|
return |
|
|
|
if instance.task_state == task_states.RESIZE_MIGRATING: |
|
# We crashed during resize/migration, so roll back for safety |
|
try: |
|
# NOTE(mriedem): check old_vm_state for STOPPED here, if it's |
|
# not in system_metadata we default to True for backwards |
|
# compatibility |
|
power_on = (instance.system_metadata.get('old_vm_state') != |
|
vm_states.STOPPED) |
|
|
|
block_dev_info = self._get_instance_block_device_info(context, |
|
instance) |
|
|
|
migration = objects.Migration.get_by_id_and_instance( |
|
context, instance.migration_context.migration_id, |
|
instance.uuid) |
|
self.driver.finish_revert_migration(context, instance, |
|
net_info, migration, block_dev_info, power_on) |
|
|
|
except Exception: |
|
LOG.exception('Failed to revert crashed migration', |
|
instance=instance) |
|
finally: |
|
LOG.info('Instance found in migrating state during ' |
|
'startup. Resetting task_state', |
|
instance=instance) |
|
instance.task_state = None |
|
instance.save() |
|
if instance.task_state == task_states.MIGRATING: |
|
# Live migration did not complete, but instance is on this |
|
# host. Abort ongoing migration if still running and reset state. |
|
self._reset_live_migration(context, instance) |
|
|
|
db_state = instance.power_state |
|
drv_state = self._get_power_state(instance) |
|
expect_running = (db_state == power_state.RUNNING and |
|
drv_state != db_state) |
|
|
|
LOG.debug('Current state is %(drv_state)s, state in DB is ' |
|
'%(db_state)s.', |
|
{'drv_state': drv_state, 'db_state': db_state}, |
|
instance=instance) |
|
|
|
if expect_running and CONF.resume_guests_state_on_host_boot: |
|
self._resume_guests_state(context, instance, net_info) |
|
|
|
def _resume_guests_state(self, context, instance, net_info): |
|
LOG.info('Rebooting instance after nova-compute restart.', |
|
instance=instance) |
|
block_device_info = \ |
|
self._get_instance_block_device_info(context, instance) |
|
|
|
try: |
|
self.driver.resume_state_on_host_boot( |
|
context, instance, net_info, block_device_info) |
|
except NotImplementedError: |
|
LOG.warning('Hypervisor driver does not support ' |
|
'resume guests', instance=instance) |
|
except Exception: |
|
# NOTE(vish): The instance failed to resume, so we set the |
|
# instance to error and attempt to continue. |
|
LOG.warning('Failed to resume instance', |
|
instance=instance) |
|
self._set_instance_obj_error_state(instance) |
|
|
|
def _retry_reboot(self, instance, current_power_state): |
|
current_task_state = instance.task_state |
|
retry_reboot = False |
|
reboot_type = compute_utils.get_reboot_type(current_task_state, |
|
current_power_state) |
|
|
|
pending_soft = ( |
|
current_task_state == task_states.REBOOT_PENDING and |
|
instance.vm_state in vm_states.ALLOW_SOFT_REBOOT) |
|
pending_hard = ( |
|
current_task_state == task_states.REBOOT_PENDING_HARD and |
|
instance.vm_state in vm_states.ALLOW_HARD_REBOOT) |
|
started_not_running = (current_task_state in |
|
[task_states.REBOOT_STARTED, |
|
task_states.REBOOT_STARTED_HARD] and |
|
current_power_state != power_state.RUNNING) |
|
|
|
if pending_soft or pending_hard or started_not_running: |
|
retry_reboot = True |
|
|
|
return retry_reboot, reboot_type |
|
|
|
def handle_lifecycle_event(self, event): |
|
LOG.info("VM %(state)s (Lifecycle Event)", |
|
{'state': event.get_name()}, |
|
instance_uuid=event.get_instance_uuid()) |
|
context = nova.context.get_admin_context(read_deleted='yes') |
|
vm_power_state = None |
|
event_transition = event.get_transition() |
|
if event_transition == virtevent.EVENT_LIFECYCLE_STOPPED: |
|
vm_power_state = power_state.SHUTDOWN |
|
elif event_transition == virtevent.EVENT_LIFECYCLE_STARTED: |
|
vm_power_state = power_state.RUNNING |
|
elif event_transition in ( |
|
virtevent.EVENT_LIFECYCLE_PAUSED, |
|
virtevent.EVENT_LIFECYCLE_POSTCOPY_STARTED, |
|
virtevent.EVENT_LIFECYCLE_MIGRATION_COMPLETED): |
|
vm_power_state = power_state.PAUSED |
|
elif event_transition == virtevent.EVENT_LIFECYCLE_RESUMED: |
|
vm_power_state = power_state.RUNNING |
|
elif event_transition == virtevent.EVENT_LIFECYCLE_SUSPENDED: |
|
vm_power_state = power_state.SUSPENDED |
|
else: |
|
LOG.warning("Unexpected lifecycle event: %d", event_transition) |
|
|
|
migrate_finish_statuses = { |
|
# This happens on the source node and indicates live migration |
|
# entered post-copy mode. |
|
virtevent.EVENT_LIFECYCLE_POSTCOPY_STARTED: 'running (post-copy)', |
|
# Suspended for offline migration. |
|
virtevent.EVENT_LIFECYCLE_MIGRATION_COMPLETED: 'running' |
|
} |
|
|
|
expected_attrs = [] |
|
if event_transition in migrate_finish_statuses: |
|
# Join on info_cache since that's needed in migrate_instance_start. |
|
expected_attrs.append('info_cache') |
|
instance = objects.Instance.get_by_uuid(context, |
|
event.get_instance_uuid(), |
|
expected_attrs=expected_attrs) |
|
|
|
# Note(lpetrut): The event may be delayed, thus not reflecting |
|
# the current instance power state. In that case, ignore the event. |
|
current_power_state = self._get_power_state(instance) |
|
if current_power_state == vm_power_state: |
|
LOG.debug('Synchronizing instance power state after lifecycle ' |
|
'event "%(event)s"; current vm_state: %(vm_state)s, ' |
|
'current task_state: %(task_state)s, current DB ' |
|
'power_state: %(db_power_state)s, VM power_state: ' |
|
'%(vm_power_state)s', |
|
{'event': event.get_name(), |
|
'vm_state': instance.vm_state, |
|
'task_state': instance.task_state, |
|
'db_power_state': instance.power_state, |
|
'vm_power_state': vm_power_state}, |
|
instance_uuid=instance.uuid) |
|
self._sync_instance_power_state(context, |
|
instance, |
|
vm_power_state) |
|
|
|
# The following checks are for live migration. We want to activate |
|
# the port binding for the destination host before the live migration |
|
# is resumed on the destination host in order to reduce network |
|
# downtime. Otherwise the ports are bound to the destination host |
|
# in post_live_migration_at_destination. |
|
# TODO(danms): Explore options for using a different live migration |
|
# specific callback for this instead of piggy-backing on the |
|
# handle_lifecycle_event callback. |
|
if (instance.task_state == task_states.MIGRATING and |
|
event_transition in migrate_finish_statuses): |
|
status = migrate_finish_statuses[event_transition] |
|
try: |
|
migration = objects.Migration.get_by_instance_and_status( |
|
context, instance.uuid, status) |
|
LOG.debug('Binding ports to destination host: %s', |
|
migration.dest_compute, instance=instance) |
|
# For neutron, migrate_instance_start will activate the |
|
# destination host port bindings, if there are any created by |
|
# conductor before live migration started. |
|
self.network_api.migrate_instance_start( |
|
context, instance, migration) |
|
except exception.MigrationNotFoundByStatus: |
|
LOG.warning("Unable to find migration record with status " |
|
"'%s' for instance. Port binding will happen in " |
|
"post live migration.", status, instance=instance) |
|
|
|
def handle_events(self, event): |
|
if isinstance(event, virtevent.LifecycleEvent): |
|
try: |
|
self.handle_lifecycle_event(event) |
|
except exception.InstanceNotFound: |
|
LOG.debug("Event %s arrived for non-existent instance. The " |
|
"instance was probably deleted.", event) |
|
else: |
|
LOG.debug("Ignoring event %s", event) |
|
|
|
def init_virt_events(self): |
|
if CONF.workarounds.handle_virt_lifecycle_events: |
|
self.driver.register_event_listener(self.handle_events) |
|
else: |
|
# NOTE(mriedem): If the _sync_power_states periodic task is |
|
# disabled we should emit a warning in the logs. |
|
if CONF.sync_power_state_interval < 0: |
|
LOG.warning('Instance lifecycle events from the compute ' |
|
'driver have been disabled. Note that lifecycle ' |
|
'changes to an instance outside of the compute ' |
|
'service will not be synchronized ' |
|
'automatically since the _sync_power_states ' |
|
'periodic task is also disabled.') |
|
else: |
|
LOG.info('Instance lifecycle events from the compute ' |
|
'driver have been disabled. Note that lifecycle ' |
|
'changes to an instance outside of the compute ' |
|
'service will only be synchronized by the ' |
|
'_sync_power_states periodic task.') |
|
|
|
def _get_nodes(self, context): |
|
"""Queried the ComputeNode objects from the DB that are reported by the |
|
hypervisor. |
|
|
|
:param context: the request context |
|
:return: a dict of ComputeNode objects keyed by the UUID of the given |
|
node. |
|
""" |
|
nodes_by_uuid = {} |
|
try: |
|
node_names = self.driver.get_available_nodes() |
|
except exception.VirtDriverNotReady: |
|
LOG.warning( |
|
"Virt driver is not ready. If this is the first time this " |
|
"service is starting on this host, then you can ignore this " |
|
"warning.") |
|
return {} |
|
|
|
for node_name in node_names: |
|
try: |
|
node = objects.ComputeNode.get_by_host_and_nodename( |
|
context, self.host, node_name) |
|
nodes_by_uuid[node.uuid] = node |
|
except exception.ComputeHostNotFound: |
|
LOG.warning( |
|
"Compute node %s not found in the database. If this is " |
|
"the first time this service is starting on this host, " |
|
"then you can ignore this warning.", node_name) |
|
return nodes_by_uuid |
|
|
|
def init_host(self): |
|
"""Initialization for a standalone compute service.""" |
|
|
|
if CONF.pci.device_spec: |
|
# Simply loading the PCI passthrough spec will do a bunch of |
|
# validation that would otherwise wait until the PciDevTracker is |
|
# constructed when updating available resources for the compute |
|
# node(s) in the resource tracker, effectively killing that task. |
|
# So load up the spec when starting the compute service to |
|
# flush any invalid configuration early, so we can kill the service |
|
# if the configuration is wrong. |
|
whitelist.Whitelist(CONF.pci.device_spec) |
|
|
|
nova.conf.neutron.register_dynamic_opts(CONF) |
|
# Even if only libvirt uses them, make it available for all drivers |
|
nova.conf.devices.register_dynamic_opts(CONF) |
|
|
|
# Override the number of concurrent disk operations allowed if the |
|
# user has specified a limit. |
|
if CONF.compute.max_concurrent_disk_ops != 0: |
|
compute_utils.disk_ops_semaphore = \ |
|
eventlet.semaphore.BoundedSemaphore( |
|
CONF.compute.max_concurrent_disk_ops) |
|
|
|
if CONF.compute.max_disk_devices_to_attach == 0: |
|
msg = _('[compute]max_disk_devices_to_attach has been set to 0, ' |
|
'which will prevent instances from being able to boot. ' |
|
'Set -1 for unlimited or set >= 1 to limit the maximum ' |
|
'number of disk devices.') |
|
raise exception.InvalidConfiguration(msg) |
|
|
|
self.driver.init_host(host=self.host) |
|
context = nova.context.get_admin_context() |
|
instances = objects.InstanceList.get_by_host( |
|
context, self.host, |
|
expected_attrs=['info_cache', 'metadata', 'numa_topology']) |
|
|
|
self.init_virt_events() |
|
|
|
self._validate_pinning_configuration(instances) |
|
self._validate_vtpm_configuration(instances) |
|
|
|
# NOTE(gibi): At this point the compute_nodes of the resource tracker |
|
# has not been populated yet so we cannot rely on the resource tracker |
|
# here. |
|
# NOTE(gibi): If ironic and vcenter virt driver slow start time |
|
# becomes problematic here then we should consider adding a config |
|
# option or a driver flag to tell us if we should thread |
|
# _destroy_evacuated_instances and |
|
# _error_out_instances_whose_build_was_interrupted out in the |
|
# background on startup |
|
nodes_by_uuid = self._get_nodes(context) |
|
|
|
try: |
|
# checking that instance was not already evacuated to other host |
|
evacuated_instances = self._destroy_evacuated_instances( |
|
context, nodes_by_uuid) |
|
|
|
# Initialise instances on the host that are not evacuating |
|
for instance in instances: |
|
if instance.uuid not in evacuated_instances: |
|
self._init_instance(context, instance) |
|
|
|
# NOTE(gibi): collect all the instance uuids that is in some way |
|
# was already handled above. Either by init_instance or by |
|
# _destroy_evacuated_instances. This way we can limit the scope of |
|
# the _error_out_instances_whose_build_was_interrupted call to look |
|
# only for instances that have allocations on this node and not |
|
# handled by the above calls. |
|
already_handled = {instance.uuid for instance in instances}.union( |
|
evacuated_instances) |
|
self._error_out_instances_whose_build_was_interrupted( |
|
context, already_handled, nodes_by_uuid.keys()) |
|
|
|
finally: |
|
if instances: |
|
# We only send the instance info to the scheduler on startup |
|
# if there is anything to send, otherwise this host might |
|
# not be mapped yet in a cell and the scheduler may have |
|
# issues dealing with the information. Later changes to |
|
# instances on this host will update the scheduler, or the |
|
# _sync_scheduler_instance_info periodic task will. |
|
self._update_scheduler_instance_info(context, instances) |
|
|
|
def _error_out_instances_whose_build_was_interrupted( |
|
self, context, already_handled_instances, node_uuids): |
|
"""If there are instances in BUILDING state that are not |
|
assigned to this host but have allocations in placement towards |
|
this compute that means the nova-compute service was |
|
restarted while those instances waited for the resource claim |
|
to finish and the _set_instance_host_and_node() to update the |
|
instance.host field. We need to push them to ERROR state here to |
|
prevent keeping them in BUILDING state forever. |
|
|
|
:param context: The request context |
|
:param already_handled_instances: The set of instance UUIDs that the |
|
host initialization process already handled in some way. |
|
:param node_uuids: The list of compute node uuids handled by this |
|
service |
|
""" |
|
|
|
# Strategy: |
|
# 1) Get the allocations from placement for our compute node(s) |
|
# 2) Remove the already handled instances from the consumer list; |
|
# they are either already initialized or need to be skipped. |
|
# 3) Check which remaining consumer is an instance in BUILDING state |
|
# and push it to ERROR state. |
|
|
|
LOG.info( |
|
"Looking for unclaimed instances stuck in BUILDING status for " |
|
"nodes managed by this host") |
|
for cn_uuid in node_uuids: |
|
try: |
|
f = self.reportclient.get_allocations_for_resource_provider |
|
allocations = f(context, cn_uuid).allocations |
|
except (exception.ResourceProviderAllocationRetrievalFailed, |
|
keystone_exception.ClientException) as e: |
|
LOG.error( |
|
"Could not retrieve compute node resource provider %s and " |
|
"therefore unable to error out any instances stuck in " |
|
"BUILDING state. Error: %s", cn_uuid, str(e)) |
|
continue |
|
|
|
not_handled_consumers = (set(allocations) - |
|
already_handled_instances) |
|
|
|
if not not_handled_consumers: |
|
continue |
|
|
|
filters = { |
|
'vm_state': vm_states.BUILDING, |
|
'uuid': not_handled_consumers |
|
} |
|
|
|
instances = objects.InstanceList.get_by_filters( |
|
context, filters, expected_attrs=[]) |
|
|
|
for instance in instances: |
|
LOG.debug( |
|
"Instance spawn was interrupted before instance_claim, " |
|
"setting instance to ERROR state", instance=instance) |
|
self._set_instance_obj_error_state( |
|
instance, clean_task_state=True) |
|
|
|
def cleanup_host(self): |
|
self.driver.register_event_listener(None) |
|
self.instance_events.cancel_all_events() |
|
self.driver.cleanup_host(host=self.host) |
|
self._cleanup_live_migrations_in_pool() |
|
|
|
def _cleanup_live_migrations_in_pool(self): |
|
# Shutdown the pool so we don't get new requests. |
|
self._live_migration_executor.shutdown(wait=False) |
|
# For any queued migrations, cancel the migration and update |
|
# its status. |
|
for migration, future in self._waiting_live_migrations.values(): |
|
# If we got here before the Future was submitted then we need |
|
# to move on since there isn't anything we can do. |
|
if future is None: |
|
continue |
|
if future.cancel(): |
|
self._set_migration_status(migration, 'cancelled') |
|
LOG.info('Successfully cancelled queued live migration.', |
|
instance_uuid=migration.instance_uuid) |
|
else: |
|
LOG.warning('Unable to cancel live migration.', |
|
instance_uuid=migration.instance_uuid) |
|
self._waiting_live_migrations.clear() |
|
|
|
def pre_start_hook(self): |
|
"""After the service is initialized, but before we fully bring |
|
the service up by listening on RPC queues, make sure to update |
|
our available resources (and indirectly our available nodes). |
|
""" |
|
self.update_available_resource(nova.context.get_admin_context(), |
|
startup=True) |
|
|
|
def _get_power_state(self, instance): |
|
"""Retrieve the power state for the given instance.""" |
|
LOG.debug('Checking state', instance=instance) |
|
try: |
|
return self.driver.get_info(instance, use_cache=False).state |
|
except exception.InstanceNotFound: |
|
return power_state.NOSTATE |
|
|
|
def _await_block_device_map_created(self, context, vol_id): |
|
# TODO(yamahata): creating volume simultaneously |
|
# reduces creation time? |
|
# TODO(yamahata): eliminate dumb polling |
|
start = time.time() |
|
retries = CONF.block_device_allocate_retries |
|
# (1) if the configured value is 0, one attempt should be made |
|
# (2) if the configured value is > 0, then the total number attempts |
|
# is (retries + 1) |
|
attempts = 1 |
|
if retries >= 1: |
|
attempts = retries + 1 |
|
for attempt in range(1, attempts + 1): |
|
volume = self.volume_api.get(context, vol_id) |
|
volume_status = volume['status'] |
|
if volume_status not in ['creating', 'downloading']: |
|
if volume_status == 'available': |
|
return attempt |
|
LOG.warning("Volume id: %(vol_id)s finished being " |
|
"created but its status is %(vol_status)s.", |
|
{'vol_id': vol_id, |
|
'vol_status': volume_status}) |
|
break |
|
greenthread.sleep(CONF.block_device_allocate_retries_interval) |
|
raise exception.VolumeNotCreated(volume_id=vol_id, |
|
seconds=int(time.time() - start), |
|
attempts=attempt, |
|
volume_status=volume_status) |
|
|
|
def _decode_files(self, injected_files): |
|
"""Base64 decode the list of files to inject.""" |
|
if not injected_files: |
|
return [] |
|
|
|
def _decode(f): |
|
path, contents = f |
|
# Py3 raises binascii.Error instead of TypeError as in Py27 |
|
try: |
|
decoded = base64.b64decode(contents) |
|
return path, decoded |
|
except (TypeError, binascii.Error): |
|
raise exception.Base64Exception(path=path) |
|
|
|
return [_decode(f) for f in injected_files] |
|
|
|
def _validate_instance_group_policy(self, context, instance, |
|
scheduler_hints=None): |
|
|
|
if CONF.workarounds.disable_group_policy_check_upcall: |
|
return |
|
|
|
# NOTE(russellb) Instance group policy is enforced by the scheduler. |
|
# However, there is a race condition with the enforcement of |
|
# the policy. Since more than one instance may be scheduled at the |
|
# same time, it's possible that more than one instance with an |
|
# anti-affinity policy may end up here. It's also possible that |
|
# multiple instances with an affinity policy could end up on different |
|
# hosts. This is a validation step to make sure that starting the |
|
# instance here doesn't violate the policy. |
|
if scheduler_hints is not None: |
|
# only go through here if scheduler_hints is provided, |
|
# even if it is empty. |
|
group_hint = scheduler_hints.get('group') |
|
if not group_hint: |
|
return |
|
else: |
|
# The RequestSpec stores scheduler_hints as key=list pairs |
|
# so we need to check the type on the value and pull the |
|
# single entry out. The API request schema validates that |
|
# the 'group' hint is a single value. |
|
if isinstance(group_hint, list): |
|
group_hint = group_hint[0] |
|
try: |
|
group = objects.InstanceGroup.get_by_hint( |
|
context, group_hint |
|
) |
|
except exception.InstanceGroupNotFound: |
|
return |
|
else: |
|
# TODO(ganso): a call to DB can be saved by adding request_spec |
|
# to rpcapi payload of live_migration, pre_live_migration and |
|
# check_can_live_migrate_destination |
|
try: |
|
group = objects.InstanceGroup.get_by_instance_uuid( |
|
context, instance.uuid |
|
) |
|
except exception.InstanceGroupNotFound: |
|
return |
|
|
|
@utils.synchronized(group['uuid']) |
|
def _do_validation(context, instance, group): |
|
if group.policy and 'anti-affinity' == group.policy: |
|
|
|
# instances on host |
|
instances_uuids = objects.InstanceList.get_uuids_by_host( |
|
context, self.host) |
|
ins_on_host = set(instances_uuids) |
|
|
|
# instance param is just for logging, the nodename obtained is |
|
# not actually related to the instance at all |
|
nodename = self._get_nodename(instance) |
|
|
|
# instances being migrated to host |
|
migrations = ( |
|
objects.MigrationList.get_in_progress_by_host_and_node( |
|
context, self.host, nodename)) |
|
migration_vm_uuids = {mig.instance_uuid for mig in migrations} |
|
|
|
total_instances = migration_vm_uuids | ins_on_host |
|
|
|
# refresh group to get updated members within locked block |
|
group = objects.InstanceGroup.get_by_uuid(context, |
|
group['uuid']) |
|
members = set(group.members) |
|
# Determine the set of instance group members on this host |
|
# which are not the instance in question. This is used to |
|
# determine how many other members from the same anti-affinity |
|
# group can be on this host. |
|
members_on_host = (total_instances & members - |
|
set([instance.uuid])) |
|
rules = group.rules |
|
if rules and 'max_server_per_host' in rules: |
|
max_server = rules['max_server_per_host'] |
|
else: |
|
max_server = 1 |
|
if len(members_on_host) >= max_server: |
|
msg = _("Anti-affinity instance group policy " |
|
"was violated.") |
|
raise exception.RescheduledException( |
|
instance_uuid=instance.uuid, |
|
reason=msg) |
|
|
|
# NOTE(ganso): The check for affinity below does not work and it |
|
# can easily be violated because the lock happens in different |
|
# compute hosts. |
|
# The only fix seems to be a DB lock to perform the check whenever |
|
# setting the host field to an instance. |
|
elif group.policy and 'affinity' == group.policy: |
|
group_hosts = group.get_hosts(exclude=[instance.uuid]) |
|
if group_hosts and self.host not in group_hosts: |
|
msg = _("Affinity instance group policy was violated.") |
|
raise exception.RescheduledException( |
|
instance_uuid=instance.uuid, |
|
reason=msg) |
|
|
|
_do_validation(context, instance, group) |
|
|
|
def _log_original_error(self, exc_info, instance_uuid): |
|
LOG.error('Error: %s', exc_info[1], instance_uuid=instance_uuid, |
|
exc_info=exc_info) |
|
|
|
@periodic_task.periodic_task |
|
def _check_instance_build_time(self, context): |
|
"""Ensure that instances are not stuck in build.""" |
|
timeout = CONF.instance_build_timeout |
|
if timeout == 0: |
|
return |
|
|
|
filters = {'vm_state': vm_states.BUILDING, |
|
'host': self.host} |
|
|
|
building_insts = objects.InstanceList.get_by_filters(context, |
|
filters, expected_attrs=[], use_slave=True) |
|
|
|
for instance in building_insts: |
|
if timeutils.is_older_than(instance.created_at, timeout): |
|
self._set_instance_obj_error_state(instance) |
|
LOG.warning("Instance build timed out. Set to error " |
|
"state.", instance=instance) |
|
|
|
def _check_instance_exists(self, instance): |
|
"""Ensure an instance with the same name is not already present.""" |
|
if self.driver.instance_exists(instance): |
|
raise exception.InstanceExists(name=instance.name) |
|
|
|
def _allocate_network_async(self, context, instance, requested_networks, |
|
security_groups, resource_provider_mapping, |
|
network_arqs): |
|
"""Method used to allocate networks in the background. |
|
|
|
Broken out for testing. |
|
""" |
|
# First check to see if we're specifically not supposed to allocate |
|
# networks because if so, we can exit early. |
|
if requested_networks and requested_networks.no_allocate: |
|
LOG.debug("Not allocating networking since 'none' was specified.", |
|
instance=instance) |
|
return network_model.NetworkInfo([]) |
|
|
|
LOG.debug("Allocating IP information in the background.", |
|
instance=instance) |
|
retries = CONF.network_allocate_retries |
|
attempts = retries + 1 |
|
retry_time = 1 |
|
bind_host_id = self.driver.network_binding_host_id(context, instance) |
|
for attempt in range(1, attempts + 1): |
|
try: |
|
nwinfo = self.network_api.allocate_for_instance( |
|
context, instance, |
|
requested_networks=requested_networks, |
|
security_groups=security_groups, |
|
bind_host_id=bind_host_id, |
|
resource_provider_mapping=resource_provider_mapping, |
|
network_arqs=network_arqs) |
|
LOG.debug('Instance network_info: |%s|', nwinfo, |
|
instance=instance) |
|
instance.system_metadata['network_allocated'] = 'True' |
|
# NOTE(JoshNang) do not save the instance here, as it can cause |
|
# races. The caller shares a reference to instance and waits |
|
# for this async greenthread to finish before calling |
|
# instance.save(). |
|
return nwinfo |
|
except Exception as e: |
|
log_info = {'attempt': attempt, |
|
'attempts': attempts} |
|
if attempt == attempts: |
|
LOG.exception('Instance failed network setup ' |
|
'after %(attempts)d attempt(s)', |
|
log_info) |
|
raise e |
|
LOG.warning('Instance failed network setup ' |
|
'(attempt %(attempt)d of %(attempts)d)', |
|
log_info, instance=instance) |
|
time.sleep(retry_time) |
|
retry_time *= 2 |
|
if retry_time > 30: |
|
retry_time = 30 |
|
# Not reached. |
|
|
|
def _build_networks_for_instance(self, context, instance, |
|
requested_networks, security_groups, resource_provider_mapping, |
|
network_arqs): |
|
|
|
# If we're here from a reschedule the network may already be allocated. |
|
if strutils.bool_from_string( |
|
instance.system_metadata.get('network_allocated', 'False')): |
|
# NOTE(alex_xu): The network_allocated is True means the network |
|
# resource already allocated at previous scheduling, and the |
|
# network setup is cleanup at previous. After rescheduling, the |
|
# network resource need setup on the new host. |
|
self.network_api.setup_instance_network_on_host( |
|
context, instance, instance.host) |
|
return self.network_api.get_instance_nw_info(context, instance) |
|
|
|
network_info = self._allocate_network(context, instance, |
|
requested_networks, security_groups, |
|
resource_provider_mapping, |
|
network_arqs) |
|
|
|
return network_info |
|
|
|
def _allocate_network(self, context, instance, requested_networks, |
|
security_groups, resource_provider_mapping, |
|
network_arqs): |
|
"""Start network allocation asynchronously. Return an instance |
|
of NetworkInfoAsyncWrapper that can be used to retrieve the |
|
allocated networks when the operation has finished. |
|
""" |
|
# NOTE(comstud): Since we're allocating networks asynchronously, |
|
# this task state has little meaning, as we won't be in this |
|
# state for very long. |
|
instance.vm_state = vm_states.BUILDING |
|
instance.task_state = task_states.NETWORKING |
|
instance.save(expected_task_state=[None]) |
|
|
|
return network_model.NetworkInfoAsyncWrapper( |
|
self._allocate_network_async, context, instance, |
|
requested_networks, security_groups, resource_provider_mapping, |
|
network_arqs) |
|
|
|
def _default_root_device_name(self, instance, image_meta, root_bdm): |
|
"""Gets a default root device name from the driver. |
|
|
|
:param nova.objects.Instance instance: |
|
The instance for which to get the root device name. |
|
:param nova.objects.ImageMeta image_meta: |
|
The metadata of the image of the instance. |
|
:param nova.objects.BlockDeviceMapping root_bdm: |
|
The description of the root device. |
|
:returns: str -- The default root device name. |
|
:raises: InternalError, TooManyDiskDevices |
|
""" |
|
try: |
|
return self.driver.default_root_device_name(instance, |
|
image_meta, |
|
root_bdm) |
|
except NotImplementedError: |
|
return compute_utils.get_next_device_name(instance, []) |
|
|
|
def _default_device_names_for_instance(self, instance, |
|
root_device_name, |
|
*block_device_lists): |
|
"""Default the missing device names in the BDM from the driver. |
|
|
|
:param nova.objects.Instance instance: |
|
The instance for which to get default device names. |
|
:param str root_device_name: The root device name. |
|
:param list block_device_lists: List of block device mappings. |
|
:returns: None |
|
:raises: InternalError, TooManyDiskDevices |
|
""" |
|
try: |
|
self.driver.default_device_names_for_instance(instance, |
|
root_device_name, |
|
*block_device_lists) |
|
except NotImplementedError: |
|
compute_utils.default_device_names_for_instance( |
|
instance, root_device_name, *block_device_lists) |
|
|
|
def _get_device_name_for_instance(self, instance, bdms, block_device_obj): |
|
"""Get the next device name from the driver, based on the BDM. |
|
|
|
:param nova.objects.Instance instance: |
|
The instance whose volume is requesting a device name. |
|
:param nova.objects.BlockDeviceMappingList bdms: |
|
The block device mappings for the instance. |
|
:param nova.objects.BlockDeviceMapping block_device_obj: |
|
A block device mapping containing info about the requested block |
|
device. |
|
:returns: The next device name. |
|
:raises: InternalError, TooManyDiskDevices |
|
""" |
|
# NOTE(ndipanov): Copy obj to avoid changing the original |
|
block_device_obj = block_device_obj.obj_clone() |
|
try: |
|
return self.driver.get_device_name_for_instance( |
|
instance, bdms, block_device_obj) |
|
except NotImplementedError: |
|
return compute_utils.get_device_name_for_instance( |
|
instance, bdms, block_device_obj.get("device_name")) |
|
|
|
def _default_block_device_names(self, instance, image_meta, block_devices): |
|
"""Verify that all the devices have the device_name set. If not, |
|
provide a default name. |
|
|
|
It also ensures that there is a root_device_name and is set to the |
|
first block device in the boot sequence (boot_index=0). |
|
""" |
|
root_bdm = block_device.get_root_bdm(block_devices) |
|
if not root_bdm: |
|
return |
|
|
|
# Get the root_device_name from the root BDM or the instance |
|
root_device_name = None |
|
update_root_bdm = False |
|
|
|
if root_bdm.device_name: |
|
root_device_name = root_bdm.device_name |
|
instance.root_device_name = root_device_name |
|
elif instance.root_device_name: |
|
root_device_name = instance.root_device_name |
|
root_bdm.device_name = root_device_name |
|
update_root_bdm = True |
|
else: |
|
root_device_name = self._default_root_device_name(instance, |
|
image_meta, |
|
root_bdm) |
|
|
|
instance.root_device_name = root_device_name |
|
root_bdm.device_name = root_device_name |
|
update_root_bdm = True |
|
|
|
if update_root_bdm: |
|
root_bdm.save() |
|
|
|
ephemerals = [] |
|
swap = [] |
|
block_device_mapping = [] |
|
|
|
for device in block_devices: |
|
if block_device.new_format_is_ephemeral(device): |
|
ephemerals.append(device) |
|
|
|
if block_device.new_format_is_swap(device): |
|
swap.append(device) |
|
|
|
if driver_block_device.is_block_device_mapping(device): |
|
block_device_mapping.append(device) |
|
|
|
self._default_device_names_for_instance(instance, |
|
root_device_name, |
|
ephemerals, |
|
swap, |
|
block_device_mapping) |
|
|
|
def _add_missing_dev_names(self, bdms, instance): |
|
for bdm in bdms: |
|
if bdm.device_name is not None: |
|
continue |
|
|
|
device_name = self._get_device_name_for_instance(instance, |
|
bdms, bdm) |
|
values = {'device_name': device_name} |
|
bdm.update(values) |
|
bdm.save() |
|
|
|
def _prep_block_device(self, context, instance, bdms): |
|
"""Set up the block device for an instance with error logging.""" |
|
try: |
|
self._add_missing_dev_names(bdms, instance) |
|
block_device_info = driver.get_block_device_info(instance, bdms) |
|
mapping = driver.block_device_info_get_mapping(block_device_info) |
|
driver_block_device.attach_block_devices( |
|
mapping, context, instance, self.volume_api, self.driver, |
|
wait_func=self._await_block_device_map_created) |
|
|
|
return block_device_info |
|
|
|
except exception.OverQuota as e: |
|
LOG.warning('Failed to create block device for instance due' |
|
' to exceeding volume related resource quota.' |
|
' Error: %s', e.message, instance=instance) |
|
raise |
|
|
|
except Exception as ex: |
|
LOG.exception('Instance failed block device setup', |
|
instance=instance) |
|
# InvalidBDM will eventually result in a BuildAbortException when |
|
# booting from volume, and will be recorded as an instance fault. |
|
# Maintain the original exception message which most likely has |
|
# useful details which the standard InvalidBDM error message lacks. |
|
raise exception.InvalidBDM(str(ex)) |
|
|
|
def _update_instance_after_spawn(self, instance, |
|
vm_state=vm_states.ACTIVE): |
|
instance.power_state = self._get_power_state(instance) |
|
instance.vm_state = vm_state |
|
instance.task_state = None |
|
# NOTE(sean-k-mooney): configdrive.update_instance checks |
|
# instance.launched_at to determine if it is the first or |
|
# subsequent spawn of an instance. We need to call update_instance |
|
# first before setting instance.launched_at or instance.config_drive |
|
# will never be set to true based on the value of force_config_drive. |
|
# As a result the config drive will be lost on a hard reboot of the |
|
# instance even when force_config_drive=true. see bug #1835822. |
|
configdrive.update_instance(instance) |
|
instance.launched_at = timeutils.utcnow() |
|
|
|
def _update_scheduler_instance_info(self, context, instance): |
|
"""Sends an InstanceList with created or updated Instance objects to |
|
the Scheduler client. |
|
|
|
In the case of init_host, the value passed will already be an |
|
InstanceList. Other calls will send individual Instance objects that |
|
have been created or resized. In this case, we create an InstanceList |
|
object containing that Instance. |
|
""" |
|
if not self.send_instance_updates: |
|
return |
|
if isinstance(instance, obj_instance.Instance): |
|
instance = objects.InstanceList(objects=[instance]) |
|
context = context.elevated() |
|
self.query_client.update_instance_info(context, self.host, |
|
instance) |
|
|
|
def _delete_scheduler_instance_info(self, context, instance_uuid): |
|
"""Sends the uuid of the deleted Instance to the Scheduler client.""" |
|
if not self.send_instance_updates: |
|
return |
|
context = context.elevated() |
|
self.query_client.delete_instance_info(context, self.host, |
|
instance_uuid) |
|
|
|
@periodic_task.periodic_task(spacing=CONF.scheduler_instance_sync_interval) |
|
def _sync_scheduler_instance_info(self, context): |
|
if not self.send_instance_updates: |
|
return |
|
context = context.elevated() |
|
instances = objects.InstanceList.get_by_host(context, self.host, |
|
expected_attrs=[], |
|
use_slave=True) |
|
uuids = [instance.uuid for instance in instances] |
|
self.query_client.sync_instance_info(context, self.host, uuids) |
|
|
|
def _notify_about_instance_usage(self, context, instance, event_suffix, |
|
network_info=None, extra_usage_info=None, |
|
fault=None): |
|
compute_utils.notify_about_instance_usage( |
|
self.notifier, context, instance, event_suffix, |
|
network_info=network_info, |
|
extra_usage_info=extra_usage_info, fault=fault) |
|
|
|
def _deallocate_network(self, context, instance, |
|
requested_networks=None): |
|
# If we were told not to allocate networks let's save ourselves |
|
# the trouble of calling the network API. |
|
if requested_networks and requested_networks.no_allocate: |
|
LOG.debug("Skipping network deallocation for instance since " |
|
"networking was not requested.", instance=instance) |
|
return |
|
|
|
LOG.debug('Deallocating network for instance', instance=instance) |
|
with timeutils.StopWatch() as timer: |
|
self.network_api.deallocate_for_instance( |
|
context, instance, requested_networks=requested_networks) |
|
# nova-network does an rpc call so we're OK tracking time spent here |
|
LOG.info('Took %0.2f seconds to deallocate network for instance.', |
|
timer.elapsed(), instance=instance) |
|
|
|
def _get_instance_block_device_info(self, context, instance, |
|
refresh_conn_info=False, |
|
bdms=None): |
|
"""Transform block devices to the driver block_device format.""" |
|
|
|
if bdms is None: |
|
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid( |
|
context, instance.uuid) |
|
block_device_info = driver.get_block_device_info(instance, bdms) |
|
|
|
if not refresh_conn_info: |
|
# if the block_device_mapping has no value in connection_info |
|
# (returned as None), don't include in the mapping |
|
block_device_info['block_device_mapping'] = [ |
|
bdm for bdm in driver.block_device_info_get_mapping( |
|
block_device_info) |
|
if bdm.get('connection_info')] |
|
else: |
|
driver_block_device.refresh_conn_infos( |
|
driver.block_device_info_get_mapping(block_device_info), |
|
context, instance, self.volume_api, self.driver) |
|
|
|
return block_device_info |
|
|
|
def _build_failed(self, node): |
|
if CONF.compute.consecutive_build_service_disable_threshold: |
|
# NOTE(danms): Update our counter, but wait for the next |
|
# update_available_resource() periodic to flush it to the DB |
|
self.rt.build_failed(node) |
|
|
|
def _build_succeeded(self, node): |
|
self.rt.build_succeeded(node) |
|
|
|
@wrap_exception() |
|
@reverts_task_state |
|
@wrap_instance_fault |
|
def build_and_run_instance(self, context, instance, image, request_spec, |
|
filter_properties, accel_uuids, admin_password=None, |
|
injected_files=None, requested_networks=None, |
|
security_groups=None, block_device_mapping=None, |
|
node=None, limits=None, host_list=None): |
|
|
|
@utils.synchronized(instance.uuid) |
|
def _locked_do_build_and_run_instance(*args, **kwargs): |
|
# NOTE(danms): We grab the semaphore with the instance uuid |
|
# locked because we could wait in line to build this instance |
|
# for a while and we want to make sure that nothing else tries |
|
# to do anything with this instance while we wait. |
|
with self._build_semaphore: |
|
try: |
|
result = self._do_build_and_run_instance(*args, **kwargs) |
|
except Exception: |
|
# NOTE(mriedem): This should really only happen if |
|
# _decode_files in _do_build_and_run_instance fails, and |
|
# that's before a guest is spawned so it's OK to remove |
|
# allocations for the instance for this node from Placement |
|
# below as there is no guest consuming resources anyway. |
|
# The _decode_files case could be handled more specifically |
|
# but that's left for another day. |
|
result = build_results.FAILED |
|
raise |
|
finally: |
|
if result == build_results.FAILED: |
|
# Remove the allocation records from Placement for the |
|
# instance if the build failed. The instance.host is |
|
# likely set to None in _do_build_and_run_instance |
|
# which means if the user deletes the instance, it |
|
# will be deleted in the API, not the compute service. |
|
# Setting the instance.host to None in |
|
# _do_build_and_run_instance means that the |
|
# ResourceTracker will no longer consider this instance |
|
# to be claiming resources against it, so we want to |
|
# reflect that same thing in Placement. No need to |
|
# call this for a reschedule, as the allocations will |
|
# have already been removed in |
|
# self._do_build_and_run_instance(). |
|
self.reportclient.delete_allocation_for_instance( |
|
context, instance.uuid, force=True) |
|
|
|
if result in (build_results.FAILED, |
|
build_results.RESCHEDULED): |
|
self._build_failed(node) |
|
else: |
|
self._build_succeeded(node) |
|
|
|
# NOTE(danms): We spawn here to return the RPC worker thread back to |
|
# the pool. Since what follows could take a really long time, we don't |
|
# want to tie up RPC workers. |
|
utils.spawn_n(_locked_do_build_and_run_instance, |
|
context, instance, image, request_spec, |
|
filter_properties, admin_password, injected_files, |
|
requested_networks, security_groups, |
|
block_device_mapping, node, limits, host_list, |
|
accel_uuids) |
|
|
|
def _check_device_tagging(self, requested_networks, block_device_mapping): |
|
tagging_requested = False |
|
if requested_networks: |
|
for net in requested_networks: |
|
if 'tag' in net and net.tag is not None: |
|
tagging_requested = True |
|
break |
|
if block_device_mapping and not tagging_requested: |
|
for bdm in block_device_mapping: |
|
if 'tag' in bdm and bdm.tag is not None: |
|
tagging_requested = True |
|
break |
|
if (tagging_requested and |
|
not self.driver.capabilities.get('supports_device_tagging', |
|
False)): |
|
raise exception.BuildAbortException('Attempt to boot guest with ' |
|
'tagged devices on host that ' |
|
'does not support tagging.') |
|
|
|
def _check_trusted_certs(self, instance): |
|
if (instance.trusted_certs and |
|
not self.driver.capabilities.get('supports_trusted_certs', |
|
False)): |
|
raise exception.BuildAbortException( |
|
'Trusted image certificates provided on host that does not ' |
|
'support certificate validation.') |
|
|
|
@wrap_exception() |
|
@reverts_task_state |
|
@wrap_instance_event(prefix='compute') |
|
@wrap_instance_fault |
|
def _do_build_and_run_instance(self, context, instance, image, |
|
request_spec, filter_properties, admin_password, injected_files, |
|
requested_networks, security_groups, block_device_mapping, |
|
node=None, limits=None, host_list=None, accel_uuids=None): |
|
|
|
try: |
|
LOG.debug('Starting instance...', instance=instance) |
|
instance.vm_state = vm_states.BUILDING |
|
instance.task_state = None |
|
instance.save(expected_task_state= |
|
(task_states.SCHEDULING, None)) |
|
except exception.InstanceNotFound: |
|
msg = 'Instance disappeared before build.' |
|
LOG.debug(msg, instance=instance) |
|
return build_results.FAILED |
|
except exception.UnexpectedTaskStateError as e: |
|
LOG.debug(e.format_message(), instance=instance) |
|
return build_results.FAILED |
|
|
|
# b64 decode the files to inject: |
|
decoded_files = self._decode_files(injected_files) |
|
|
|
if limits is None: |
|
limits = {} |
|
|
|
if node is None: |
|
node = self._get_nodename(instance, refresh=True) |
|
|
|
try: |
|
with timeutils.StopWatch() as timer: |
|
self._build_and_run_instance(context, instance, image, |
|
decoded_files, admin_password, requested_networks, |
|
security_groups, block_device_mapping, node, limits, |
|
filter_properties, request_spec, accel_uuids) |
|
LOG.info('Took %0.2f seconds to build instance.', |
|
timer.elapsed(), instance=instance) |
|
return build_results.ACTIVE |
|
except exception.RescheduledException as e: |
|
retry = filter_properties.get('retry') |
|
if not retry: |
|
# no retry information, do not reschedule. |
|
|