tricircle/tricircle/proxy/compute_manager.py

752 lines
34 KiB
Python

# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import contextlib
import functools
import six
import sys
import time
import traceback
from oslo_config import cfg
import oslo_log.log as logging
import oslo_messaging as messaging
from oslo_utils import excutils
from oslo_utils import strutils
from tricircle.common.i18n import _
from tricircle.common.i18n import _LE
from tricircle.common.i18n import _LW
from tricircle.common.nova_lib import block_device
from tricircle.common.nova_lib import compute_manager
from tricircle.common.nova_lib import compute_utils
from tricircle.common.nova_lib import conductor
from tricircle.common.nova_lib import driver_block_device
from tricircle.common.nova_lib import exception
from tricircle.common.nova_lib import manager
from tricircle.common.nova_lib import network
from tricircle.common.nova_lib import network_model
from tricircle.common.nova_lib import objects
from tricircle.common.nova_lib import openstack_driver
from tricircle.common.nova_lib import pipelib
from tricircle.common.nova_lib import rpc
from tricircle.common.nova_lib import task_states
from tricircle.common.nova_lib import utils
from tricircle.common.nova_lib import vm_states
from tricircle.common.nova_lib import volume
import tricircle.common.utils as t_utils
CONF = cfg.CONF
compute_opts = [
cfg.StrOpt('default_access_ip_network_name',
help='Name of network to use to set access IPs for instances'),
cfg.IntOpt('network_allocate_retries',
default=0,
help="Number of times to retry network allocation on failures"),
]
CONF.register_opts(compute_opts)
LOG = logging.getLogger(__name__)
SERVICE_NAME = 'proxy_compute'
get_notifier = functools.partial(rpc.get_notifier, service=SERVICE_NAME)
wrap_exception = functools.partial(exception.wrap_exception,
get_notifier=get_notifier)
reverts_task_state = compute_manager.reverts_task_state
wrap_instance_fault = compute_manager.wrap_instance_fault
wrap_instance_event = compute_manager.wrap_instance_event
class ProxyComputeManager(manager.Manager):
target = messaging.Target(version='4.0')
def __init__(self, *args, **kwargs):
self.is_neutron_security_groups = (
openstack_driver.is_neutron_security_groups())
self.use_legacy_block_device_info = False
self.network_api = network.API()
self.volume_api = volume.API()
self.conductor_api = conductor.API()
self.compute_task_api = conductor.ComputeTaskAPI()
super(ProxyComputeManager, self).__init__(
service_name=SERVICE_NAME, *args, **kwargs)
def _decode_files(self, injected_files):
"""Base64 decode the list of files to inject."""
if not injected_files:
return []
def _decode(f):
path, contents = f
try:
decoded = base64.b64decode(contents)
return path, decoded
except TypeError:
raise exception.Base64Exception(path=path)
return [_decode(f) for f in injected_files]
def _cleanup_allocated_networks(self, context, instance,
requested_networks):
try:
self._deallocate_network(context, instance, requested_networks)
except Exception:
msg = _LE('Failed to deallocate networks')
LOG.exception(msg, instance=instance)
return
instance.system_metadata['network_allocated'] = 'False'
try:
instance.save()
except exception.InstanceNotFound:
pass
def _deallocate_network(self, context, instance,
requested_networks=None):
LOG.debug('Deallocating network for instance', instance=instance)
self.network_api.deallocate_for_instance(
context, instance, requested_networks=requested_networks)
def _cleanup_volumes(self, context, instance_uuid, bdms, raise_exc=True):
exc_info = None
for bdm in bdms:
LOG.debug("terminating bdm %s", bdm,
instance_uuid=instance_uuid)
if bdm.volume_id and bdm.delete_on_termination:
try:
self.volume_api.delete(context, bdm.volume_id)
except Exception as exc:
exc_info = sys.exc_info()
LOG.warn(_LW('Failed to delete volume: %(volume_id)s due '
'to %(exc)s'), {'volume_id': bdm.volume_id,
'exc': unicode(exc)})
if exc_info is not None and raise_exc:
six.reraise(exc_info[0], exc_info[1], exc_info[2])
def _instance_update(self, context, instance, **kwargs):
"""Update an instance in the database using kwargs as value."""
for k, v in kwargs.items():
setattr(instance, k, v)
instance.save()
def _set_instance_obj_error_state(self, context, instance,
clean_task_state=False):
try:
instance.vm_state = vm_states.ERROR
if clean_task_state:
instance.task_state = None
instance.save()
except exception.InstanceNotFound:
LOG.debug('Instance has been destroyed from under us while '
'trying to set it to ERROR', instance=instance)
def _notify_about_instance_usage(self, context, instance, event_suffix,
network_info=None, system_metadata=None,
extra_usage_info=None, fault=None):
compute_utils.notify_about_instance_usage(
self.notifier, context, instance, event_suffix,
network_info=network_info,
system_metadata=system_metadata,
extra_usage_info=extra_usage_info, fault=fault)
def _validate_instance_group_policy(self, context, instance,
filter_properties):
# NOTE(russellb) Instance group policy is enforced by the scheduler.
# However, there is a race condition with the enforcement of
# anti-affinity. Since more than one instance may be scheduled at the
# same time, it's possible that more than one instance with an
# anti-affinity policy may end up here. This is a validation step to
# make sure that starting the instance here doesn't violate the policy.
scheduler_hints = filter_properties.get('scheduler_hints') or {}
group_hint = scheduler_hints.get('group')
if not group_hint:
return
@utils.synchronized(group_hint)
def _do_validation(context, instance, group_hint):
group = objects.InstanceGroup.get_by_hint(context, group_hint)
if 'anti-affinity' not in group.policies and (
'affinity' not in group.policies):
return
group_hosts = group.get_hosts(context, exclude=[instance.uuid])
if self.host in group_hosts:
if 'anti-affinity' in group.policies:
msg = _("Anti-affinity instance group policy "
"was violated.")
raise exception.RescheduledException(
instance_uuid=instance.uuid,
reason=msg)
elif group_hosts and [self.host] != group_hosts:
# NOTE(huawei) Native code only considered anti-affinity
# policy, but affinity policy also have the same problem.
# so we add checker for affinity policy instance.
if 'affinity' in group.policies:
msg = _("affinity instance group policy was violated.")
raise exception.RescheduledException(
instance_uuid=instance.uuid,
reason=msg)
_do_validation(context, instance, group_hint)
@wrap_exception()
@reverts_task_state
@wrap_instance_fault
def build_and_run_instance(
self, context, host, instance, image, request_spec,
filter_properties, admin_password=None, injected_files=None,
requested_networks=None, security_groups=None,
block_device_mapping=None, node=None, limits=None):
if (requested_networks and
not isinstance(requested_networks,
objects.NetworkRequestList)):
requested_networks = objects.NetworkRequestList(
objects=[objects.NetworkRequest.from_tuple(t)
for t in requested_networks])
@utils.synchronized(instance.uuid)
def _locked_do_build_and_run_instance(*args, **kwargs):
self._do_build_and_run_instance(*args, **kwargs)
utils.spawn_n(_locked_do_build_and_run_instance,
context, host, instance, image, request_spec,
filter_properties, admin_password, injected_files,
requested_networks, security_groups,
block_device_mapping, node, limits)
@wrap_exception()
@reverts_task_state
@wrap_instance_event
@wrap_instance_fault
def _do_build_and_run_instance(self, context, host, instance, image,
request_spec, filter_properties,
admin_password, injected_files,
requested_networks, security_groups,
block_device_mapping, node=None,
limits=None):
try:
LOG.debug(_('Starting instance...'), context=context,
instance=instance)
instance.vm_state = vm_states.BUILDING
instance.task_state = None
instance.save(expected_task_state=(task_states.SCHEDULING, None))
except exception.InstanceNotFound:
msg = 'Instance disappeared before build.'
LOG.debug(msg, instance=instance)
return
except exception.UnexpectedTaskStateError as e:
LOG.debug(e.format_message(), instance=instance)
return
# b64 decode the files to inject:
decoded_files = self._decode_files(injected_files)
if limits is None:
limits = {}
if node is None:
node = t_utils.get_node_name(host)
LOG.debug('No node specified, defaulting to %s', node,
instance=instance)
try:
self._build_and_run_instance(
context, host, instance, image, request_spec, decoded_files,
admin_password, requested_networks, security_groups,
block_device_mapping, node, limits, filter_properties)
except exception.RescheduledException as e:
LOG.debug(e.format_message(), instance=instance)
retry = filter_properties.get('retry', None)
if not retry:
# no retry information, do not reschedule.
LOG.debug("Retry info not present, will not reschedule",
instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
compute_utils.add_instance_fault_from_exc(
context, instance, e, sys.exc_info())
self._set_instance_obj_error_state(context, instance,
clean_task_state=True)
return
retry['exc'] = traceback.format_exception(*sys.exc_info())
self.network_api.cleanup_instance_network_on_host(
context, instance, self.host)
instance.task_state = task_states.SCHEDULING
instance.save()
self.compute_task_api.build_instances(
context, [instance], image, filter_properties, admin_password,
injected_files, requested_networks, security_groups,
block_device_mapping)
except (exception.InstanceNotFound,
exception.UnexpectedDeletingTaskStateError):
msg = 'Instance disappeared during build.'
LOG.debug(msg, instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
except exception.BuildAbortException as e:
LOG.exception(e.format_message(), instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
self._cleanup_volumes(context, instance.uuid,
block_device_mapping, raise_exc=False)
compute_utils.add_instance_fault_from_exc(
context, instance, e, sys.exc_info())
self._set_instance_obj_error_state(context, instance,
clean_task_state=True)
except Exception as e:
# should not reach here.
msg = _LE('Unexpected build failure, not rescheduling build.')
LOG.exception(msg, instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
self._cleanup_volumes(context, instance.uuid,
block_device_mapping, raise_exc=False)
compute_utils.add_instance_fault_from_exc(context, instance,
e, sys.exc_info())
self._set_instance_obj_error_state(context, instance,
clean_task_state=True)
def _get_instance_nw_info(self, context, instance, use_slave=False):
"""Get a list of dictionaries of network data of an instance."""
return self.network_api.get_instance_nw_info(context, instance,
use_slave=use_slave)
def _allocate_network(self, context, instance, requested_networks, macs,
security_groups, dhcp_options):
"""Start network allocation asynchronously.
Return an instance of NetworkInfoAsyncWrapper that can be used to
retrieve the allocated networks when the operation has finished.
"""
# NOTE(comstud): Since we're allocating networks asynchronously,
# this task state has little meaning, as we won't be in this
# state for very long.
instance.vm_state = vm_states.BUILDING
instance.task_state = task_states.NETWORKING
instance.save(expected_task_state=[None])
is_vpn = pipelib.is_vpn_image(instance.image_ref)
return network_model.NetworkInfoAsyncWrapper(
self._allocate_network_async, context, instance,
requested_networks, macs, security_groups, is_vpn, dhcp_options)
def _allocate_network_async(self, context, instance, requested_networks,
macs, security_groups, is_vpn, dhcp_options):
"""Method used to allocate networks in the background.
Broken out for testing.
"""
LOG.debug("Allocating IP information in the background.",
instance=instance)
retries = CONF.network_allocate_retries
if retries < 0:
LOG.warn(_("Treating negative config value (%(retries)s) for "
"'network_allocate_retries' as 0."),
{'retries': retries})
attempts = retries > 1 and retries + 1 or 1
retry_time = 1
for attempt in range(1, attempts + 1):
try:
nwinfo = self.network_api.allocate_for_instance(
context, instance, vpn=is_vpn,
requested_networks=requested_networks,
macs=macs, security_groups=security_groups,
dhcp_options=dhcp_options)
LOG.debug('Instance network_info: |%s|', nwinfo,
instance=instance)
instance.system_metadata['network_allocated'] = 'True'
# NOTE(JoshNang) do not save the instance here, as it can cause
# races. The caller shares a reference to instance and waits
# for this async greenthread to finish before calling
# instance.save().
return nwinfo
except Exception:
exc_info = sys.exc_info()
log_info = {'attempt': attempt,
'attempts': attempts}
if attempt == attempts:
LOG.exception(_LE('Instance failed network setup '
'after %(attempts)d attempt(s)'),
log_info)
raise exc_info[0], exc_info[1], exc_info[2]
LOG.warn(_('Instance failed network setup '
'(attempt %(attempt)d of %(attempts)d)'),
log_info, instance=instance)
time.sleep(retry_time)
retry_time *= 2
if retry_time > 30:
retry_time = 30
# Not reached.
def _build_networks_for_instance(self, context, instance,
requested_networks, security_groups):
# If we're here from a reschedule the network may already be allocated.
if strutils.bool_from_string(
instance.system_metadata.get('network_allocated', 'False')):
# NOTE(alex_xu): The network_allocated is True means the network
# resource already allocated at previous scheduling, and the
# network setup is cleanup at previous. After rescheduling, the
# network resource need setup on the new host.
self.network_api.setup_instance_network_on_host(
context, instance, instance.host)
return self._get_instance_nw_info(context, instance)
if not self.is_neutron_security_groups:
security_groups = []
# NOTE(zhiyuan) in ComputeManager, driver method "macs_for_instance"
# and "dhcp_options_for_instance" are called to get macs and
# dhcp_options, here we just set them to None
macs = None
dhcp_options = None
network_info = self._allocate_network(context, instance,
requested_networks, macs,
security_groups, dhcp_options)
if not instance.access_ip_v4 and not instance.access_ip_v6:
# If CONF.default_access_ip_network_name is set, grab the
# corresponding network and set the access ip values accordingly.
# Note that when there are multiple ips to choose from, an
# arbitrary one will be chosen.
network_name = CONF.default_access_ip_network_name
if not network_name:
return network_info
for vif in network_info:
if vif['network']['label'] == network_name:
for ip in vif.fixed_ips():
if ip['version'] == 4:
instance.access_ip_v4 = ip['address']
if ip['version'] == 6:
instance.access_ip_v6 = ip['address']
instance.save()
break
return network_info
# NOTE(zhiyuan) the task of this function is to do some preparation job
# for driver and cinder volume, but in nova proxy _proxy_run_instance will
# do such job, remove this function after cinder proxy is ready and we
# confirm it is useless
def _prep_block_device(self, context, instance, bdms,
do_check_attach=True):
"""Set up the block device for an instance with error logging."""
try:
block_device_info = {
'root_device_name': instance['root_device_name'],
'swap': driver_block_device.convert_swap(bdms),
'ephemerals': driver_block_device.convert_ephemerals(bdms),
'block_device_mapping': (
driver_block_device.attach_block_devices(
driver_block_device.convert_volumes(bdms),
context, instance, self.volume_api,
self.driver, do_check_attach=do_check_attach) +
driver_block_device.attach_block_devices(
driver_block_device.convert_snapshots(bdms),
context, instance, self.volume_api,
self.driver, self._await_block_device_map_created,
do_check_attach=do_check_attach) +
driver_block_device.attach_block_devices(
driver_block_device.convert_images(bdms),
context, instance, self.volume_api,
self.driver, self._await_block_device_map_created,
do_check_attach=do_check_attach) +
driver_block_device.attach_block_devices(
driver_block_device.convert_blanks(bdms),
context, instance, self.volume_api,
self.driver, self._await_block_device_map_created,
do_check_attach=do_check_attach))
}
if self.use_legacy_block_device_info:
for bdm_type in ('swap', 'ephemerals', 'block_device_mapping'):
block_device_info[bdm_type] = \
driver_block_device.legacy_block_devices(
block_device_info[bdm_type])
# Get swap out of the list
block_device_info['swap'] = driver_block_device.get_swap(
block_device_info['swap'])
return block_device_info
except exception.OverQuota:
msg = _LW('Failed to create block device for instance due to '
'being over volume resource quota')
LOG.warn(msg, instance=instance)
raise exception.InvalidBDM()
except Exception:
LOG.exception(_LE('Instance failed block device setup'),
instance=instance)
raise exception.InvalidBDM()
def _default_block_device_names(self, context, instance,
image_meta, block_devices):
"""Verify that all the devices have the device_name set.
If not, provide a default name. It also ensures that there is a
root_device_name and is set to the first block device in the boot
sequence (boot_index=0).
"""
root_bdm = block_device.get_root_bdm(block_devices)
if not root_bdm:
return
# Get the root_device_name from the root BDM or the instance
root_device_name = None
update_instance = False
update_root_bdm = False
if root_bdm.device_name:
root_device_name = root_bdm.device_name
instance.root_device_name = root_device_name
update_instance = True
elif instance.root_device_name:
root_device_name = instance.root_device_name
root_bdm.device_name = root_device_name
update_root_bdm = True
else:
# NOTE(zhiyuan) if driver doesn't implement related function,
# function in compute_utils will be called
root_device_name = compute_utils.get_next_device_name(instance, [])
instance.root_device_name = root_device_name
root_bdm.device_name = root_device_name
update_instance = update_root_bdm = True
if update_instance:
instance.save()
if update_root_bdm:
root_bdm.save()
ephemerals = filter(block_device.new_format_is_ephemeral,
block_devices)
swap = filter(block_device.new_format_is_swap,
block_devices)
block_device_mapping = filter(
driver_block_device.is_block_device_mapping, block_devices)
# NOTE(zhiyuan) if driver doesn't implement related function,
# function in compute_utils will be called
compute_utils.default_device_names_for_instance(
instance, root_device_name, ephemerals, swap, block_device_mapping)
@contextlib.contextmanager
def _build_resources(self, context, instance, requested_networks,
security_groups, image, block_device_mapping):
resources = {}
network_info = None
try:
network_info = self._build_networks_for_instance(
context, instance, requested_networks, security_groups)
resources['network_info'] = network_info
except (exception.InstanceNotFound,
exception.UnexpectedDeletingTaskStateError):
raise
except exception.UnexpectedTaskStateError as e:
raise exception.BuildAbortException(instance_uuid=instance.uuid,
reason=e.format_message())
except Exception:
# Because this allocation is async any failures are likely to occur
# when the driver accesses network_info during spawn().
LOG.exception(_LE('Failed to allocate network(s)'),
instance=instance)
msg = _('Failed to allocate the network(s), not rescheduling.')
raise exception.BuildAbortException(instance_uuid=instance.uuid,
reason=msg)
try:
# Verify that all the BDMs have a device_name set and assign a
# default to the ones missing it with the help of the driver.
self._default_block_device_names(context, instance, image,
block_device_mapping)
instance.vm_state = vm_states.BUILDING
instance.task_state = task_states.BLOCK_DEVICE_MAPPING
instance.save()
# NOTE(zhiyuan) remove this commented code after cinder proxy is
# ready and we confirm _prep_block_device is useless
#
# block_device_info = self._prep_block_device(
# context, instance, block_device_mapping)
#
block_device_info = None
resources['block_device_info'] = block_device_info
except (exception.InstanceNotFound,
exception.UnexpectedDeletingTaskStateError):
with excutils.save_and_reraise_exception() as ctxt:
# Make sure the async call finishes
if network_info is not None:
network_info.wait(do_raise=False)
except exception.UnexpectedTaskStateError as e:
# Make sure the async call finishes
if network_info is not None:
network_info.wait(do_raise=False)
raise exception.BuildAbortException(instance_uuid=instance.uuid,
reason=e.format_message())
except Exception:
LOG.exception(_LE('Failure prepping block device'),
instance=instance)
# Make sure the async call finishes
if network_info is not None:
network_info.wait(do_raise=False)
msg = _('Failure prepping block device.')
raise exception.BuildAbortException(instance_uuid=instance.uuid,
reason=msg)
self._heal_proxy_networks(context, instance, network_info)
cascaded_ports = self._heal_proxy_ports(
context, instance, network_info)
resources['cascaded_ports'] = cascaded_ports
try:
yield resources
except Exception as exc:
with excutils.save_and_reraise_exception() as ctxt:
if not isinstance(exc, (
exception.InstanceNotFound,
exception.UnexpectedDeletingTaskStateError)):
LOG.exception(_LE('Instance failed to spawn'),
instance=instance)
# Make sure the async call finishes
if network_info is not None:
network_info.wait(do_raise=False)
try:
self._shutdown_instance(context, instance,
block_device_mapping,
requested_networks,
try_deallocate_networks=False)
except Exception:
ctxt.reraise = False
msg = _('Could not clean up failed build,'
' not rescheduling')
raise exception.BuildAbortException(
instance_uuid=instance.uuid, reason=msg)
def _build_and_run_instance(self, context, host, instance, image,
request_spec, injected_files, admin_password,
requested_networks, security_groups,
block_device_mapping, node, limits,
filter_properties):
image_name = image.get('name')
self._notify_about_instance_usage(context, instance, 'create.start',
extra_usage_info={
'image_name': image_name})
try:
self._validate_instance_group_policy(context, instance,
filter_properties)
with self._build_resources(context, instance, requested_networks,
security_groups, image,
block_device_mapping) as resources:
instance.vm_state = vm_states.BUILDING
instance.task_state = task_states.SPAWNING
instance.save(
expected_task_state=task_states.BLOCK_DEVICE_MAPPING)
cascaded_ports = resources['cascaded_ports']
request_spec['block_device_mapping'] = block_device_mapping
request_spec['security_group'] = security_groups
self._proxy_run_instance(
context, instance, request_spec, filter_properties,
requested_networks, injected_files, admin_password,
None, host, node, None, cascaded_ports)
except (exception.InstanceNotFound,
exception.UnexpectedDeletingTaskStateError) as e:
with excutils.save_and_reraise_exception():
self._notify_about_instance_usage(context, instance,
'create.end', fault=e)
except exception.ComputeResourcesUnavailable as e:
LOG.debug(e.format_message(), instance=instance)
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
raise exception.RescheduledException(
instance_uuid=instance.uuid, reason=e.format_message())
except exception.BuildAbortException as e:
with excutils.save_and_reraise_exception():
LOG.debug(e.format_message(), instance=instance)
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
except (exception.FixedIpLimitExceeded,
exception.NoMoreNetworks) as e:
LOG.warn(_LW('No more network or fixed IP to be allocated'),
instance=instance)
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
msg = _('Failed to allocate the network(s) with error %s, '
'not rescheduling.') % e.format_message()
raise exception.BuildAbortException(instance_uuid=instance.uuid,
reason=msg)
except (exception.VirtualInterfaceCreateException,
exception.VirtualInterfaceMacAddressException) as e:
LOG.exception(_LE('Failed to allocate network(s)'),
instance=instance)
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
msg = _('Failed to allocate the network(s), not rescheduling.')
raise exception.BuildAbortException(instance_uuid=instance.uuid,
reason=msg)
except (exception.FlavorDiskTooSmall,
exception.FlavorMemoryTooSmall,
exception.ImageNotActive,
exception.ImageUnacceptable) as e:
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
raise exception.BuildAbortException(instance_uuid=instance.uuid,
reason=e.format_message())
except Exception as e:
self._notify_about_instance_usage(context, instance,
'create.error', fault=e)
raise exception.RescheduledException(
instance_uuid=instance.uuid, reason=six.text_type(e))
def _shutdown_instance(self, context, instance, bdms,
requested_networks=None, notify=True,
try_deallocate_networks=True):
LOG.debug('Proxy stop instance')
# proxy new function below
def _heal_proxy_networks(self, context, instance, network_info):
pass
def _heal_proxy_ports(self, context, instance, network_info):
return []
def _proxy_run_instance(self, context, instance, request_spec=None,
filter_properties=None, requested_networks=None,
injected_files=None, admin_password=None,
is_first_time=False, host=None, node=None,
legacy_bdm_in_spec=True, physical_ports=None):
LOG.debug('Proxy run instance')