add separated host-manager for nova solver scheduler
and modify the previous solver-scheduler accordingly. Change-Id: I353c1996c38853f961706a8d2c41cff3815f7d43
This commit is contained in:
parent
9c242528fd
commit
037e86fc49
@ -22,20 +22,25 @@ A default solver implementation that uses PULP is included.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova import exception
|
||||
from nova.openstack.common.gettextutils import _
|
||||
from nova.openstack.common import importutils
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common.gettextutils import _
|
||||
from nova.scheduler import driver
|
||||
from nova.scheduler import filter_scheduler
|
||||
from nova.scheduler import weights
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
solver_opts = [cfg.StrOpt('scheduler_host_solver',
|
||||
default='nova.scheduler.solvers.pluggable_hosts_pulp_solver.HostsPulpSolver',
|
||||
help='The pluggable solver implementation to use. By default, a '
|
||||
solver_opts = [
|
||||
cfg.StrOpt('scheduler_host_solver',
|
||||
default='nova.scheduler.solvers'
|
||||
'.pluggable_hosts_pulp_solver.HostsPulpSolver',
|
||||
help='The pluggable solver implementation to use. By default, a '
|
||||
'reference solver implementation is included that models '
|
||||
'the problem as a Linear Programming (LP) problem using PULP.'),]
|
||||
'the problem as a Linear Programming (LP) problem using PULP.'),
|
||||
]
|
||||
|
||||
CONF.register_opts(solver_opts, group='solver_scheduler')
|
||||
|
||||
@ -50,6 +55,76 @@ class ConstraintSolverScheduler(filter_scheduler.FilterScheduler):
|
||||
self.hosts_solver = importutils.import_object(
|
||||
CONF.solver_scheduler.scheduler_host_solver)
|
||||
|
||||
def schedule_run_instance(self, context, request_spec,
|
||||
admin_password, injected_files,
|
||||
requested_networks, is_first_time,
|
||||
filter_properties, legacy_bdm_in_spec):
|
||||
"""This method is called from nova.compute.api to provision
|
||||
an instance. We first create a build plan (a list of WeightedHosts)
|
||||
and then provision.
|
||||
|
||||
Returns a list of the instances created.
|
||||
"""
|
||||
payload = dict(request_spec=request_spec)
|
||||
self.notifier.info(context, 'scheduler.run_instance.start', payload)
|
||||
|
||||
instance_uuids = request_spec.get('instance_uuids')
|
||||
LOG.info(_("Attempting to build %(num_instances)d instance(s) "
|
||||
"uuids: %(instance_uuids)s"),
|
||||
{'num_instances': len(instance_uuids),
|
||||
'instance_uuids': instance_uuids})
|
||||
LOG.debug(_("Request Spec: %s") % request_spec)
|
||||
|
||||
# Stuff network requests into filter_properties
|
||||
# NOTE (Xinyuan): currently for POC only.
|
||||
filter_properties['requested_networks'] = requested_networks
|
||||
|
||||
weighed_hosts = self._schedule(context, request_spec,
|
||||
filter_properties, instance_uuids)
|
||||
|
||||
# NOTE: Pop instance_uuids as individual creates do not need the
|
||||
# set of uuids. Do not pop before here as the upper exception
|
||||
# handler fo NoValidHost needs the uuid to set error state
|
||||
instance_uuids = request_spec.pop('instance_uuids')
|
||||
|
||||
# NOTE(comstud): Make sure we do not pass this through. It
|
||||
# contains an instance of RpcContext that cannot be serialized.
|
||||
filter_properties.pop('context', None)
|
||||
|
||||
for num, instance_uuid in enumerate(instance_uuids):
|
||||
request_spec['instance_properties']['launch_index'] = num
|
||||
|
||||
try:
|
||||
try:
|
||||
weighed_host = weighed_hosts.pop(0)
|
||||
LOG.info(_("Choosing host %(weighed_host)s "
|
||||
"for instance %(instance_uuid)s"),
|
||||
{'weighed_host': weighed_host,
|
||||
'instance_uuid': instance_uuid})
|
||||
except IndexError:
|
||||
raise exception.NoValidHost(reason="")
|
||||
|
||||
self._provision_resource(context, weighed_host,
|
||||
request_spec,
|
||||
filter_properties,
|
||||
requested_networks,
|
||||
injected_files, admin_password,
|
||||
is_first_time,
|
||||
instance_uuid=instance_uuid,
|
||||
legacy_bdm_in_spec=legacy_bdm_in_spec)
|
||||
except Exception as ex:
|
||||
# NOTE(vish): we don't reraise the exception here to make sure
|
||||
# that all instances in the request get set to
|
||||
# error properly
|
||||
driver.handle_schedule_error(context, ex, instance_uuid,
|
||||
request_spec)
|
||||
# scrub retry host list in case we're scheduling multiple
|
||||
# instances:
|
||||
retry = filter_properties.get('retry', {})
|
||||
retry['hosts'] = []
|
||||
|
||||
self.notifier.info(context, 'scheduler.run_instance.end', payload)
|
||||
|
||||
def _schedule(self, context, request_spec, filter_properties,
|
||||
instance_uuids=None):
|
||||
"""Returns a list of hosts that meet the required specs,
|
||||
@ -88,7 +163,6 @@ class ConstraintSolverScheduler(filter_scheduler.FilterScheduler):
|
||||
update_group_hosts,
|
||||
instance_uuids)
|
||||
|
||||
|
||||
def _get_final_host_list(self, context, request_spec, filter_properties,
|
||||
instance_properties, update_group_hosts=False,
|
||||
instance_uuids=None):
|
||||
@ -100,93 +174,25 @@ class ConstraintSolverScheduler(filter_scheduler.FilterScheduler):
|
||||
# this returns a host iterator
|
||||
hosts = self._get_all_host_states(context)
|
||||
selected_hosts = []
|
||||
hosts = self._get_hosts_stripping_ignored_and_forced(
|
||||
hosts = self.host_manager.get_hosts_stripping_ignored_and_forced(
|
||||
hosts, filter_properties)
|
||||
|
||||
list_hosts = list(hosts)
|
||||
host_instance_tuples_list = self.hosts_solver.host_solve(
|
||||
list_hosts, instance_uuids,
|
||||
request_spec, filter_properties)
|
||||
list_hosts, instance_uuids,
|
||||
request_spec, filter_properties)
|
||||
LOG.debug(_("solver results: %(host_instance_list)s") %
|
||||
{"host_instance_list": host_instance_tuples_list})
|
||||
# NOTE(Yathi): Not using weights in solver scheduler,
|
||||
# but creating a list of WeighedHosts with a default weight of 1
|
||||
# to match the common method signatures of the
|
||||
# FilterScheduler class
|
||||
selected_hosts = [weights.WeighedHost(host, 1)
|
||||
for (host, instance) in
|
||||
host_instance_tuples_list]
|
||||
for (host, instance) in host_instance_tuples_list]
|
||||
for chosen_host in selected_hosts:
|
||||
# Update the host state after deducting the
|
||||
# resource used by the instance
|
||||
chosen_host.obj.consume_from_instance(instance_properties)
|
||||
if update_group_hosts is True:
|
||||
filter_properties['group_hosts'].append(chosen_host.obj.host)
|
||||
filter_properties['group_hosts'].add(chosen_host.obj.host)
|
||||
return selected_hosts
|
||||
|
||||
def _get_hosts_stripping_ignored_and_forced(self, hosts,
|
||||
filter_properties):
|
||||
"""Filter hosts by stripping any ignored hosts and
|
||||
matching any forced hosts or nodes.
|
||||
"""
|
||||
|
||||
def _strip_ignore_hosts(host_map, hosts_to_ignore):
|
||||
ignored_hosts = []
|
||||
for host in hosts_to_ignore:
|
||||
for (hostname, nodename) in host_map.keys():
|
||||
if host == hostname:
|
||||
del host_map[(hostname, nodename)]
|
||||
ignored_hosts.append(host)
|
||||
ignored_hosts_str = ', '.join(ignored_hosts)
|
||||
msg = _('Host filter ignoring hosts: %s')
|
||||
LOG.audit(msg % ignored_hosts_str)
|
||||
|
||||
def _match_forced_hosts(host_map, hosts_to_force):
|
||||
forced_hosts = []
|
||||
for (hostname, nodename) in host_map.keys():
|
||||
if hostname not in hosts_to_force:
|
||||
del host_map[(hostname, nodename)]
|
||||
else:
|
||||
forced_hosts.append(hostname)
|
||||
if host_map:
|
||||
forced_hosts_str = ', '.join(forced_hosts)
|
||||
msg = _('Host filter forcing available hosts to %s')
|
||||
else:
|
||||
forced_hosts_str = ', '.join(hosts_to_force)
|
||||
msg = _("No hosts matched due to not matching "
|
||||
"'force_hosts' value of '%s'")
|
||||
LOG.audit(msg % forced_hosts_str)
|
||||
|
||||
def _match_forced_nodes(host_map, nodes_to_force):
|
||||
forced_nodes = []
|
||||
for (hostname, nodename) in host_map.keys():
|
||||
if nodename not in nodes_to_force:
|
||||
del host_map[(hostname, nodename)]
|
||||
else:
|
||||
forced_nodes.append(nodename)
|
||||
if host_map:
|
||||
forced_nodes_str = ', '.join(forced_nodes)
|
||||
msg = _('Host filter forcing available nodes to %s')
|
||||
else:
|
||||
forced_nodes_str = ', '.join(nodes_to_force)
|
||||
msg = _("No nodes matched due to not matching "
|
||||
"'force_nodes' value of '%s'")
|
||||
LOG.audit(msg % forced_nodes_str)
|
||||
|
||||
ignore_hosts = filter_properties.get('ignore_hosts', [])
|
||||
force_hosts = filter_properties.get('force_hosts', [])
|
||||
force_nodes = filter_properties.get('force_nodes', [])
|
||||
|
||||
if ignore_hosts or force_hosts or force_nodes:
|
||||
# NOTE(deva): we can't assume "host" is unique because
|
||||
# one host may have many nodes.
|
||||
name_to_cls_map = dict([((x.host, x.nodename), x) for x in hosts])
|
||||
if ignore_hosts:
|
||||
_strip_ignore_hosts(name_to_cls_map, ignore_hosts)
|
||||
if not name_to_cls_map:
|
||||
return []
|
||||
# NOTE(deva): allow force_hosts and force_nodes independently
|
||||
if force_hosts:
|
||||
_match_forced_hosts(name_to_cls_map, force_hosts)
|
||||
if force_nodes:
|
||||
_match_forced_nodes(name_to_cls_map, force_nodes)
|
||||
hosts = name_to_cls_map.itervalues()
|
||||
|
||||
return hosts
|
||||
|
535
nova/scheduler/solver_scheduler_host_manager.py
Normal file
535
nova/scheduler/solver_scheduler_host_manager.py
Normal file
@ -0,0 +1,535 @@
|
||||
# Copyright (c) 2011 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Manage hosts in the current zone.
|
||||
"""
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova.compute import task_states
|
||||
from nova.compute import vm_states
|
||||
from nova import db
|
||||
from nova.objects import aggregate as aggregate_obj
|
||||
from nova.objects import instance as instance_obj
|
||||
from nova.openstack.common.gettextutils import _
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common import timeutils
|
||||
from nova.pci import pci_request
|
||||
from nova.pci import pci_stats
|
||||
from nova.scheduler import host_manager
|
||||
|
||||
physnet_config_file_opts = [
|
||||
cfg.StrOpt('physnet_config_file',
|
||||
default='/etc/neutron/plugins/ml2/ml2_conf_cisco.ini',
|
||||
help='The config file specifying the physical network topology')
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('scheduler_available_filters', 'nova.scheduler.host_manager')
|
||||
CONF.import_opt('scheduler_default_filters', 'nova.scheduler.host_manager')
|
||||
CONF.import_opt('scheduler_weight_classes', 'nova.scheduler.host_manager')
|
||||
CONF.register_opts(physnet_config_file_opts)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HostState(host_manager.HostState):
|
||||
"""Mutable and immutable information tracked for a host.
|
||||
This is an attempt to remove the ad-hoc data structures
|
||||
previously used and lock down access.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(HostState, self).__init__(*args, **kwargs)
|
||||
self.projects = []
|
||||
# For network constraints
|
||||
# NOTE(Xinyuan): currently for POC only, and may require Neurtron
|
||||
self.networks = []
|
||||
self.physnet_config = []
|
||||
self.rack_networks = []
|
||||
# For host aggregate constraints
|
||||
self.host_aggregates_stats = {}
|
||||
|
||||
def update_from_hosted_instances(self, context, compute):
|
||||
service = compute['service']
|
||||
if not service:
|
||||
LOG.warn(_("No service for compute ID %s") % compute['id'])
|
||||
return
|
||||
host = service['host']
|
||||
# retrieve instances for each hosts to extract needed infomation
|
||||
# NOTE: ideally we should use get_by_host_and_node, but there's a bug
|
||||
# in the Icehouse release, that doesn't allow 'expected_attrs' here.
|
||||
instances = instance_obj.InstanceList.get_by_host(context, host,
|
||||
expected_attrs=['info_cache'])
|
||||
# get hosted networks
|
||||
# NOTE(Xinyuan): POC.
|
||||
instance_networks = []
|
||||
for inst in instances:
|
||||
network_info = inst.get('info_cache', {}).get('network_info', [])
|
||||
instance_networks.extend([vif['network']['id']
|
||||
for vif in network_info])
|
||||
self.networks = list(set(instance_networks))
|
||||
|
||||
def update_from_compute_node(self, compute):
|
||||
"""Update information about a host from its compute_node info."""
|
||||
if (self.updated and compute['updated_at']
|
||||
and self.updated > compute['updated_at']):
|
||||
return
|
||||
all_ram_mb = compute['memory_mb']
|
||||
|
||||
# Assume virtual size is all consumed by instances if use qcow2 disk.
|
||||
free_gb = compute['free_disk_gb']
|
||||
least_gb = compute.get('disk_available_least')
|
||||
if least_gb is not None:
|
||||
if least_gb > free_gb:
|
||||
# can occur when an instance in database is not on host
|
||||
LOG.warn(_("Host has more disk space than database expected"
|
||||
" (%(physical)sgb > %(database)sgb)") %
|
||||
{'physical': least_gb, 'database': free_gb})
|
||||
free_gb = min(least_gb, free_gb)
|
||||
free_disk_mb = free_gb * 1024
|
||||
|
||||
self.disk_mb_used = compute['local_gb_used'] * 1024
|
||||
|
||||
#NOTE(jogo) free_ram_mb can be negative
|
||||
self.free_ram_mb = compute['free_ram_mb']
|
||||
self.total_usable_ram_mb = all_ram_mb
|
||||
self.total_usable_disk_gb = compute['local_gb']
|
||||
self.free_disk_mb = free_disk_mb
|
||||
self.vcpus_total = compute['vcpus']
|
||||
self.vcpus_used = compute['vcpus_used']
|
||||
self.updated = compute['updated_at']
|
||||
if 'pci_stats' in compute:
|
||||
self.pci_stats = pci_stats.PciDeviceStats(compute['pci_stats'])
|
||||
else:
|
||||
self.pci_stats = None
|
||||
|
||||
# All virt drivers report host_ip
|
||||
self.host_ip = compute['host_ip']
|
||||
self.hypervisor_type = compute.get('hypervisor_type')
|
||||
self.hypervisor_version = compute.get('hypervisor_version')
|
||||
self.hypervisor_hostname = compute.get('hypervisor_hostname')
|
||||
self.cpu_info = compute.get('cpu_info')
|
||||
if compute.get('supported_instances'):
|
||||
self.supported_instances = jsonutils.loads(
|
||||
compute.get('supported_instances'))
|
||||
|
||||
# Don't store stats directly in host_state to make sure these don't
|
||||
# overwrite any values, or get overwritten themselves. Store in self so
|
||||
# filters can schedule with them.
|
||||
stats = compute.get('stats', None) or '{}'
|
||||
self.stats = jsonutils.loads(stats)
|
||||
|
||||
self.hypervisor_version = compute['hypervisor_version']
|
||||
|
||||
# Track number of instances on host
|
||||
self.num_instances = int(self.stats.get('num_instances', 0))
|
||||
|
||||
# Track number of instances by project_id
|
||||
project_id_keys = [k for k in self.stats.keys() if
|
||||
k.startswith("num_proj_")]
|
||||
for key in project_id_keys:
|
||||
project_id = key[9:]
|
||||
self.num_instances_by_project[project_id] = int(self.stats[key])
|
||||
|
||||
# Track number of instances in certain vm_states
|
||||
vm_state_keys = [k for k in self.stats.keys() if
|
||||
k.startswith("num_vm_")]
|
||||
for key in vm_state_keys:
|
||||
vm_state = key[7:]
|
||||
self.vm_states[vm_state] = int(self.stats[key])
|
||||
|
||||
# Track number of instances in certain task_states
|
||||
task_state_keys = [k for k in self.stats.keys() if
|
||||
k.startswith("num_task_")]
|
||||
for key in task_state_keys:
|
||||
task_state = key[9:]
|
||||
self.task_states[task_state] = int(self.stats[key])
|
||||
|
||||
# Track number of instances by host_type
|
||||
os_keys = [k for k in self.stats.keys() if
|
||||
k.startswith("num_os_type_")]
|
||||
for key in os_keys:
|
||||
os = key[12:]
|
||||
self.num_instances_by_os_type[os] = int(self.stats[key])
|
||||
|
||||
# Track the number of projects on host
|
||||
self.projects = [k[9:] for k in self.stats.keys() if
|
||||
k.startswith("num_proj_") and int(self.stats[k]) > 0]
|
||||
|
||||
self.num_io_ops = int(self.stats.get('io_workload', 0))
|
||||
|
||||
# update metrics
|
||||
self._update_metrics_from_compute_node(compute)
|
||||
|
||||
def consume_from_instance(self, instance):
|
||||
"""Incrementally update host state from an instance."""
|
||||
disk_mb = (instance['root_gb'] + instance['ephemeral_gb']) * 1024
|
||||
ram_mb = instance['memory_mb']
|
||||
vcpus = instance['vcpus']
|
||||
self.free_ram_mb -= ram_mb
|
||||
self.free_disk_mb -= disk_mb
|
||||
self.vcpus_used += vcpus
|
||||
self.updated = timeutils.utcnow()
|
||||
|
||||
# Track number of instances on host
|
||||
self.num_instances += 1
|
||||
|
||||
# Track number of instances by project_id
|
||||
project_id = instance.get('project_id')
|
||||
if project_id not in self.num_instances_by_project:
|
||||
self.num_instances_by_project[project_id] = 0
|
||||
self.num_instances_by_project[project_id] += 1
|
||||
|
||||
# Track number of instances in certain vm_states
|
||||
vm_state = instance.get('vm_state', vm_states.BUILDING)
|
||||
if vm_state not in self.vm_states:
|
||||
self.vm_states[vm_state] = 0
|
||||
self.vm_states[vm_state] += 1
|
||||
|
||||
# Track number of instances in certain task_states
|
||||
task_state = instance.get('task_state')
|
||||
if task_state not in self.task_states:
|
||||
self.task_states[task_state] = 0
|
||||
self.task_states[task_state] += 1
|
||||
|
||||
# Track number of instances by host_type
|
||||
os_type = instance.get('os_type')
|
||||
if os_type not in self.num_instances_by_os_type:
|
||||
self.num_instances_by_os_type[os_type] = 0
|
||||
self.num_instances_by_os_type[os_type] += 1
|
||||
|
||||
pci_requests = pci_request.get_instance_pci_requests(instance)
|
||||
if pci_requests and self.pci_stats:
|
||||
self.pci_stats.apply_requests(pci_requests)
|
||||
|
||||
vm_state = instance.get('vm_state', vm_states.BUILDING)
|
||||
task_state = instance.get('task_state')
|
||||
if vm_state == vm_states.BUILDING or task_state in [
|
||||
task_states.RESIZE_MIGRATING, task_states.REBUILDING,
|
||||
task_states.RESIZE_PREP, task_states.IMAGE_SNAPSHOT,
|
||||
task_states.IMAGE_BACKUP]:
|
||||
self.num_io_ops += 1
|
||||
|
||||
# Track the number of projects
|
||||
project_id = instance.get('project_id')
|
||||
if project_id not in self.projects:
|
||||
self.projects.append(project_id)
|
||||
|
||||
# Track aggregate stats
|
||||
project_id = instance.get('project_id')
|
||||
for aggr in self.host_aggregates_stats:
|
||||
aggregate_project_list = self.host_aggregates_stats[aggr].get(
|
||||
'projects', [])
|
||||
if project_id not in aggregate_project_list:
|
||||
self.host_aggregates_stats[aggr]['projects'].append(project_id)
|
||||
|
||||
def update_from_networks(self, requested_networks):
|
||||
for network_id, fixed_ip, port_id in requested_networks:
|
||||
if network_id:
|
||||
if network_id not in self.networks:
|
||||
self.networks.append(network_id)
|
||||
if not network_id not in self.aggregated_networks:
|
||||
for device in self.aggregated_networks:
|
||||
self.aggregated_networks[device].append(network_id)
|
||||
# do this for host aggregates
|
||||
for aggr in self.host_aggregates_stats:
|
||||
host_aggr_network_list = self.host_aggregates_stats[
|
||||
aggr].get('networks', [])
|
||||
if network_id not in host_aggr_network_list:
|
||||
self.host_aggregates_stats[aggr][
|
||||
'networks'].append(network_id)
|
||||
|
||||
def __repr__(self):
|
||||
return ("(%s, %s) ram:%s disk:%s io_ops:%s instances:%s "
|
||||
"physnet_config:%s networks:%s rack_networks:%s "
|
||||
"projects:%s aggregate_stats:%s" %
|
||||
(self.host, self.nodename, self.free_ram_mb, self.free_disk_mb,
|
||||
self.num_io_ops, self.num_instances, self.physnet_config,
|
||||
self.networks, self.rack_networks, self.projects,
|
||||
self.host_aggregates_stats))
|
||||
|
||||
|
||||
class SolverSchedulerHostManager(host_manager.HostManager):
|
||||
"""HostManager class for solver scheduler."""
|
||||
|
||||
# Can be overridden in a subclass
|
||||
host_state_cls = HostState
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(SolverSchedulerHostManager, self).__init__(*args, **kwargs)
|
||||
|
||||
def get_hosts_stripping_ignored_and_forced(self, hosts,
|
||||
filter_properties):
|
||||
"""Filter hosts by stripping any ignored hosts and
|
||||
matching any forced hosts or nodes.
|
||||
"""
|
||||
|
||||
def _strip_ignore_hosts(host_map, hosts_to_ignore):
|
||||
ignored_hosts = []
|
||||
for host in hosts_to_ignore:
|
||||
for (hostname, nodename) in host_map.keys():
|
||||
if host == hostname:
|
||||
del host_map[(hostname, nodename)]
|
||||
ignored_hosts.append(host)
|
||||
ignored_hosts_str = ', '.join(ignored_hosts)
|
||||
msg = _('Host filter ignoring hosts: %s')
|
||||
LOG.audit(msg % ignored_hosts_str)
|
||||
|
||||
def _match_forced_hosts(host_map, hosts_to_force):
|
||||
forced_hosts = []
|
||||
for (hostname, nodename) in host_map.keys():
|
||||
if hostname not in hosts_to_force:
|
||||
del host_map[(hostname, nodename)]
|
||||
else:
|
||||
forced_hosts.append(hostname)
|
||||
if host_map:
|
||||
forced_hosts_str = ', '.join(forced_hosts)
|
||||
msg = _('Host filter forcing available hosts to %s')
|
||||
else:
|
||||
forced_hosts_str = ', '.join(hosts_to_force)
|
||||
msg = _("No hosts matched due to not matching "
|
||||
"'force_hosts' value of '%s'")
|
||||
LOG.audit(msg % forced_hosts_str)
|
||||
|
||||
def _match_forced_nodes(host_map, nodes_to_force):
|
||||
forced_nodes = []
|
||||
for (hostname, nodename) in host_map.keys():
|
||||
if nodename not in nodes_to_force:
|
||||
del host_map[(hostname, nodename)]
|
||||
else:
|
||||
forced_nodes.append(nodename)
|
||||
if host_map:
|
||||
forced_nodes_str = ', '.join(forced_nodes)
|
||||
msg = _('Host filter forcing available nodes to %s')
|
||||
else:
|
||||
forced_nodes_str = ', '.join(nodes_to_force)
|
||||
msg = _("No nodes matched due to not matching "
|
||||
"'force_nodes' value of '%s'")
|
||||
LOG.audit(msg % forced_nodes_str)
|
||||
|
||||
ignore_hosts = filter_properties.get('ignore_hosts', [])
|
||||
force_hosts = filter_properties.get('force_hosts', [])
|
||||
force_nodes = filter_properties.get('force_nodes', [])
|
||||
|
||||
if ignore_hosts or force_hosts or force_nodes:
|
||||
# NOTE(deva): we can't assume "host" is unique because
|
||||
# one host may have many nodes.
|
||||
name_to_cls_map = dict([((x.host, x.nodename), x) for x in hosts])
|
||||
if ignore_hosts:
|
||||
_strip_ignore_hosts(name_to_cls_map, ignore_hosts)
|
||||
if not name_to_cls_map:
|
||||
return []
|
||||
# NOTE(deva): allow force_hosts and force_nodes independently
|
||||
if force_hosts:
|
||||
_match_forced_hosts(name_to_cls_map, force_hosts)
|
||||
if force_nodes:
|
||||
_match_forced_nodes(name_to_cls_map, force_nodes)
|
||||
hosts = name_to_cls_map.itervalues()
|
||||
|
||||
return hosts
|
||||
|
||||
def get_filtered_hosts(self, hosts, filter_properties,
|
||||
filter_class_names=None, index=0):
|
||||
"""Filter hosts and return only ones passing all filters."""
|
||||
# NOTE(Yathi): Calling the method to apply ignored and forced options
|
||||
hosts = self.get_hosts_stripping_ignored_and_forced(hosts,
|
||||
filter_properties)
|
||||
|
||||
force_hosts = filter_properties.get('force_hosts', [])
|
||||
force_nodes = filter_properties.get('force_nodes', [])
|
||||
|
||||
if force_hosts or force_nodes:
|
||||
# NOTE: Skip filters when forcing host or node
|
||||
return list(hosts)
|
||||
|
||||
filter_classes = self._choose_host_filters(filter_class_names)
|
||||
|
||||
return self.filter_handler.get_filtered_objects(filter_classes,
|
||||
hosts, filter_properties, index)
|
||||
|
||||
def _get_aggregate_stats(self, context, host_state_map):
|
||||
"""Update certain stats for the aggregates of the hosts."""
|
||||
aggregates = aggregate_obj.AggregateList.get_all(context)
|
||||
host_state_list_map = {}
|
||||
|
||||
for (host, node) in host_state_map.keys():
|
||||
current_list = host_state_list_map.get(host, None)
|
||||
state = host_state_map[(host, node)]
|
||||
if not current_list:
|
||||
host_state_list_map[host] = [state]
|
||||
else:
|
||||
host_state_list_map[host] = current_list.append(state)
|
||||
|
||||
for aggregate in aggregates:
|
||||
hosts = aggregate.hosts
|
||||
projects = set()
|
||||
networks = set()
|
||||
# Collect all the projects from all the member hosts
|
||||
aggr_host_states = []
|
||||
for host in hosts:
|
||||
host_state_list = host_state_list_map.get(host, None) or []
|
||||
aggr_host_states += host_state_list
|
||||
for host_state in host_state_list:
|
||||
projects = projects.union(host_state.projects)
|
||||
networks = networks.union(host_state.networks)
|
||||
aggregate_stats = {'hosts': hosts,
|
||||
'projects': list(projects),
|
||||
'networks': list(networks),
|
||||
'metadata': aggregate.metadata}
|
||||
# Now set this value to all the member host_states
|
||||
for host_state in aggr_host_states:
|
||||
host_state.host_aggregates_stats[
|
||||
aggregate.name] = aggregate_stats
|
||||
|
||||
def _get_rack_states(self, context, host_state_map):
|
||||
"""Retrieve the physical and virtual network states of the hosts.
|
||||
"""
|
||||
def _get_physnet_mappings():
|
||||
"""Get physical network topologies from a Neutron config file.
|
||||
This is a hard-coded function which only supports Cisco Nexus
|
||||
driver for Neutron ML2 plugin currently.
|
||||
"""
|
||||
# NOTE(Xinyuan): This feature is for POC only!
|
||||
# TODO(Xinyuan): further works are required in implementing
|
||||
# Neutron API extensions to get related information.
|
||||
host2device_map = {}
|
||||
device2host_map = {}
|
||||
sections = {}
|
||||
|
||||
state_keys = host_state_map.keys()
|
||||
hostname_list = [host for (host, node) in state_keys]
|
||||
|
||||
try:
|
||||
physnet_config_parser = cfg.ConfigParser(
|
||||
CONF.physnet_config_file, sections)
|
||||
physnet_config_parser.parse()
|
||||
except Exception:
|
||||
LOG.warn(_("Physnet config file was not parsed properly."))
|
||||
# Example section:
|
||||
# [ml2_mech_cisco_nexus:1.1.1.1]
|
||||
# compute1=1/1
|
||||
# compute2=1/2
|
||||
# ssh_port=22
|
||||
# username=admin
|
||||
# password=mySecretPassword
|
||||
for parsed_item in sections.keys():
|
||||
dev_id, sep, dev_ip = parsed_item.partition(':')
|
||||
if dev_id.lower() == 'ml2_mech_cisco_nexus':
|
||||
for key, value in sections[parsed_item].items():
|
||||
if key in hostname_list:
|
||||
hostname = key
|
||||
portid = value[0]
|
||||
host2device_map.setdefault(hostname, [])
|
||||
host2device_map[hostname].append((dev_ip, portid))
|
||||
device2host_map.setdefault(dev_ip, [])
|
||||
device2host_map[dev_ip].append((hostname, portid))
|
||||
return host2device_map, device2host_map
|
||||
|
||||
def _get_rack_networks(host_dev_map, dev_host_map, host_state_map):
|
||||
"""Aggregate the networks associated with a group of hosts in
|
||||
same physical groups (e.g. under same ToR switches...)
|
||||
"""
|
||||
rack_networks = {}
|
||||
|
||||
if not dev_host_map or not host_dev_map:
|
||||
return rack_networks
|
||||
|
||||
host_networks = {}
|
||||
for state_key in host_state_map.keys():
|
||||
(host, node) = state_key
|
||||
host_state = host_state_map[state_key]
|
||||
host_networks.setdefault(host, set())
|
||||
host_networks[host] = host_networks[host].union(
|
||||
host_state.networks)
|
||||
|
||||
# aggregate hosted networks for each upper level device
|
||||
dev_networks = {}
|
||||
for dev_id in dev_host_map.keys():
|
||||
current_dev_networks = set()
|
||||
for (host_name, port_id) in dev_host_map[dev_id]:
|
||||
current_dev_networks = current_dev_networks.union(
|
||||
host_networks.get(host_name, []))
|
||||
dev_networks[dev_id] = list(current_dev_networks)
|
||||
|
||||
# make aggregated networks list for each hosts
|
||||
for host_name in host_dev_map.keys():
|
||||
dev_list = list(set([dev_id for (dev_id, physport)
|
||||
in host_dev_map.get(host_name, [])]))
|
||||
host_rack_networks = {}
|
||||
for dev in dev_list:
|
||||
host_rack_networks[dev] = dev_networks.get(dev, [])
|
||||
rack_networks[host_name] = host_rack_networks
|
||||
|
||||
return rack_networks
|
||||
|
||||
host_dev_map, dev_host_map = _get_physnet_mappings()
|
||||
rack_networks = _get_rack_networks(
|
||||
host_dev_map, dev_host_map, host_state_map)
|
||||
|
||||
for state_key in host_state_map.keys():
|
||||
host_state = self.host_state_map[state_key]
|
||||
(host, node) = state_key
|
||||
host_state.physnet_config = host_dev_map.get(host, [])
|
||||
host_state.rack_networks = rack_networks.get(host, [])
|
||||
|
||||
def get_all_host_states(self, context):
|
||||
"""Returns a list of HostStates that represents all the hosts
|
||||
the HostManager knows about. Also, each of the consumable resources
|
||||
in HostState are pre-populated and adjusted based on data in the db.
|
||||
"""
|
||||
|
||||
# Get resource usage across the available compute nodes:
|
||||
compute_nodes = db.compute_node_get_all(context)
|
||||
seen_nodes = set()
|
||||
for compute in compute_nodes:
|
||||
service = compute['service']
|
||||
if not service:
|
||||
LOG.warn(_("No service for compute ID %s") % compute['id'])
|
||||
continue
|
||||
host = service['host']
|
||||
node = compute.get('hypervisor_hostname')
|
||||
state_key = (host, node)
|
||||
capabilities = self.service_states.get(state_key, None)
|
||||
host_state = self.host_state_map.get(state_key)
|
||||
if host_state:
|
||||
host_state.update_capabilities(capabilities,
|
||||
dict(service.iteritems()))
|
||||
else:
|
||||
host_state = self.host_state_cls(host, node,
|
||||
capabilities=capabilities,
|
||||
service=dict(service.iteritems()))
|
||||
self.host_state_map[state_key] = host_state
|
||||
host_state.update_from_compute_node(compute)
|
||||
# update information from hosted instances
|
||||
host_state.update_from_hosted_instances(context, compute)
|
||||
seen_nodes.add(state_key)
|
||||
|
||||
# remove compute nodes from host_state_map if they are not active
|
||||
dead_nodes = set(self.host_state_map.keys()) - seen_nodes
|
||||
for state_key in dead_nodes:
|
||||
host, node = state_key
|
||||
LOG.info(_("Removing dead compute node %(host)s:%(node)s "
|
||||
"from scheduler") % {'host': host, 'node': node})
|
||||
del self.host_state_map[state_key]
|
||||
|
||||
# get information from groups of hosts
|
||||
# NOTE(Xinyaun): currently for POC only.
|
||||
self._get_rack_states(context, self.host_state_map)
|
||||
self._get_aggregate_stats(context, self.host_state_map)
|
||||
|
||||
return self.host_state_map.itervalues()
|
Loading…
Reference in New Issue
Block a user