Claim container allocation in placement

Change-Id: I3e290bd22815ac8dbd968ad43652880b969c04d8
This commit is contained in:
Hongbin Lu 2019-08-05 22:26:29 +00:00
parent 4479d5758c
commit 7b3b1c5e07
25 changed files with 645 additions and 50 deletions

View File

@ -738,3 +738,30 @@ def convert_mb_to_ceil_gb(mb_value):
# ensure we reserve/allocate enough space by rounding up to nearest GB
gb_int = int(math.ceil(gb_float))
return gb_int
if hasattr(inspect, 'getfullargspec'):
getargspec = inspect.getfullargspec
else:
getargspec = inspect.getargspec
def expects_func_args(*args):
def _decorator_checker(dec):
@functools.wraps(dec)
def _decorator(f):
base_f = get_wrapped_function(f)
argspec = getargspec(base_f)
if argspec[1] or argspec[2] or set(args) <= set(argspec[0]):
# NOTE (ndipanov): We can't really tell if correct stuff will
# be passed if it's a function with *args or **kwargs so
# we still carry on and hope for the best
return dec(f)
else:
raise TypeError("Decorated function %(f_name)s does not "
"have the arguments expected by the "
"decorator %(d_name)s" %
{'f_name': base_f.__name__,
'd_name': dec.__name__})
return _decorator
return _decorator_checker

View File

@ -44,6 +44,7 @@ class ComputeNodeTracker(object):
self.scheduler_client = scheduler_client.SchedulerClient()
self.pci_tracker = None
self.reportclient = reportclient
self.rp_uuid = None
def _setup_pci_tracker(self, context, compute_node):
if not self.pci_tracker:
@ -72,6 +73,7 @@ class ComputeNodeTracker(object):
LOG.info('Node created for :%(host)s', {'host': self.host})
else:
self._copy_resources(node, resources)
node.rp_uuid = self._get_node_rp_uuid(context, node)
self._setup_pci_tracker(context, node)
self.compute_node = node
self._update_available_resource(context)
@ -97,6 +99,21 @@ class ComputeNodeTracker(object):
LOG.warning("No compute node record for: %(host)s",
{'host': self.host})
def _get_node_rp_uuid(self, context, node):
if self.rp_uuid:
return self.rp_uuid
if CONF.compute.host_shared_with_nova:
try:
self.rp_uuid = self.reportclient.get_provider_by_name(
context, node.hostname)['uuid']
except exception.ResourceProviderNotFound:
raise exception.ComputeHostNotFound(host=node.nodename)
else:
self.rp_uuid = node.uuid
return self.rp_uuid
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
def container_claim(self, context, container, pci_requests, limits=None):
"""Indicate resources are needed for an upcoming container build.
@ -306,14 +323,7 @@ class ComputeNodeTracker(object):
def _update_to_placement(self, context, compute_node):
"""Send resource and inventory changes to placement."""
nodename = compute_node.hostname
node_rp_uuid = compute_node.uuid
if CONF.compute.host_shared_with_nova:
try:
node_rp_uuid = self.reportclient.get_provider_by_name(
context, nodename)['uuid']
except exception.ResourceProviderNotFound:
raise exception.ComputeHostNotFound(host=nodename)
node_rp_uuid = self._get_node_rp_uuid(context, compute_node)
# Persist the stats to the Scheduler
# First try update_provider_tree
# Retrieve the provider tree associated with this compute node. If

View File

@ -368,6 +368,8 @@ class Manager(periodic_task.PeriodicTasks):
six.text_type(e))
self._fail_container(context, container, six.text_type(e),
unset_host=True)
self.reportclient.delete_allocation_for_container(
context, container.uuid)
def _attach_volumes_for_capsule(self, context, capsule, requested_volumes):
for c in (capsule.init_containers or []):
@ -521,12 +523,15 @@ class Manager(periodic_task.PeriodicTasks):
self._detach_volumes(context, container, reraise=reraise)
container.destroy(context)
self._get_resource_tracker()
# Remove the claimed resource
rt = self._get_resource_tracker()
rt.remove_usage_from_container(context, container, True)
self.reportclient.delete_allocation_for_container(context,
container.uuid)
# only destroy the container in the db if the
# delete_allocation_for_instance doesn't raise and therefore
# allocation is successfully deleted in placement
container.destroy(context)
def add_security_group(self, context, container, security_group):
@utils.synchronized(container.uuid)

View File

@ -68,7 +68,7 @@ in Zun is ``runc``."""),
help='The maximum disk size in GB that user can set '
'when run/create container.'),
cfg.IntOpt('default_memory',
default=2048,
default=512,
help='The default memory in MB a container can use '
'(will be used if user do not specify '
'container\'s memory). This value should be '

View File

@ -0,0 +1,59 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""add rp_uuid to compute_node
Revision ID: d502ce8fb705
Revises: b2bda272f4dd
Create Date: 2019-08-25 15:27:06.626340
"""
# revision identifiers, used by Alembic.
revision = 'd502ce8fb705'
down_revision = 'b2bda272f4dd'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
COMPUTE_NODE_TABLE = sa.Table(
'compute_node', sa.MetaData(),
sa.Column('uuid', sa.String(36), primary_key=True, nullable=False),
sa.Column('rp_uuid', sa.String(36), nullable=True))
def upgrade():
op.add_column('compute_node',
sa.Column('rp_uuid', sa.String(length=36), nullable=True))
op.create_unique_constraint('uniq_compute_node0rp_uuid',
'compute_node', ['rp_uuid'])
# perform data migration between tables
session = sa.orm.Session(bind=op.get_bind())
with session.begin(subtransactions=True):
for row in session.query(COMPUTE_NODE_TABLE):
session.execute(
COMPUTE_NODE_TABLE.update().values(
rp_uuid=row.uuid).where(
COMPUTE_NODE_TABLE.c.uuid == row.uuid)
)
# this commit is necessary to allow further operations
session.commit()
op.alter_column('compute_node', 'rp_uuid',
nullable=False,
existing_type=sa.String(length=36),
existing_nullable=True,
existing_server_default=False)

View File

@ -807,7 +807,7 @@ class Connection(object):
return ref
def _add_compute_nodes_filters(self, query, filters):
filter_names = ['hostname']
filter_names = ['hostname', 'rp_uuid']
return self._add_filters(query, models.ComputeNode, filters=filters,
filter_names=filter_names)
@ -823,6 +823,8 @@ class Connection(object):
# ensure defaults are present for new compute nodes
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
if not values.get('rp_uuid'):
values['rp_uuid'] = values['uuid']
compute_node = models.ComputeNode()
compute_node.update(values)

View File

@ -368,6 +368,7 @@ class ComputeNode(Base):
table_args()
)
uuid = Column(String(36), primary_key=True, nullable=False)
rp_uuid = Column(String(36), nullable=False)
hostname = Column(String(255), nullable=False)
numa_topology = Column(JSONEncodedDict, nullable=True)
mem_total = Column(Integer, nullable=False, default=0)

View File

@ -35,10 +35,12 @@ class ComputeNode(base.ZunPersistentObject, base.ZunObject):
# Version 1.11: Add disk_quota_supported field
# Version 1.12: Add runtimes field
# Version 1.13: Add enable_cpu_pinning field
VERSION = '1.13'
# Version 1.14: Add rp_uuid field
VERSION = '1.14'
fields = {
'uuid': fields.UUIDField(read_only=True, nullable=False),
'rp_uuid': fields.UUIDField(nullable=False),
'numa_topology': fields.ObjectField('NUMATopology', nullable=True),
'hostname': fields.StringField(nullable=False),
'mem_total': fields.IntegerField(nullable=False),

View File

@ -13,24 +13,82 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
from keystoneauth1 import exceptions as ks_exc
from oslo_log import log as logging
from stevedore import driver
from zun.common import consts
from zun.common import exception
import zun.conf
from zun.scheduler.client import report
from zun.scheduler import request_filter
from zun.scheduler import utils
CONF = zun.conf.CONF
LOG = logging.getLogger(__name__)
class SchedulerClient(object):
"""Client library for placing calls to the scheduler."""
def __init__(self):
self.placement_client = report.SchedulerReportClient()
scheduler_driver = CONF.scheduler.driver
self.driver = driver.DriverManager(
"zun.scheduler.driver",
scheduler_driver,
invoke_on_load=True).driver
self.traits_ensured = False
def select_destinations(self, context, containers, extra_spec):
return self.driver.select_destinations(context, containers, extra_spec)
def select_destinations(self, context, containers, extra_specs):
LOG.debug("Starting to schedule for containers: %s",
[c.uuid for c in containers])
if not self.traits_ensured:
self.placement_client._ensure_traits(context, consts.CUSTOM_TRAITS)
self.traits_ensured = True
alloc_reqs_by_rp_uuid, provider_summaries, allocation_request_version \
= None, None, None
request_filter.process_reqspec(context, extra_specs)
resources = utils.resources_from_request_spec(
context, containers[0], extra_specs)
try:
res = self.placement_client.get_allocation_candidates(context,
resources)
(alloc_reqs, provider_summaries, allocation_request_version) = res
except (ks_exc.EndpointNotFound,
ks_exc.MissingAuthPlugin,
ks_exc.Unauthorized,
ks_exc.DiscoveryFailure,
ks_exc.ConnectFailure):
# We have to handle the case that we failed to connect to the
# Placement service.
alloc_reqs, provider_summaries, allocation_request_version = (
None, None, None)
if not alloc_reqs:
LOG.info("Got no allocation candidates from the Placement "
"API. This could be due to insufficient resources "
"or a temporary occurrence as compute nodes start "
"up.")
raise exception.NoValidHost(reason="")
else:
# Build a dict of lists of allocation requests, keyed by
# provider UUID, so that when we attempt to claim resources for
# a host, we can grab an allocation request easily
alloc_reqs_by_rp_uuid = collections.defaultdict(list)
for ar in alloc_reqs:
for rp_uuid in ar['allocations']:
alloc_reqs_by_rp_uuid[rp_uuid].append(ar)
selections = self.driver.select_destinations(
context, containers, extra_specs, alloc_reqs_by_rp_uuid,
provider_summaries, allocation_request_version)
return selections
def update_resource(self, node):
node.save()

View File

@ -210,7 +210,8 @@ class SchedulerReportClient(object):
headers = ({request_id.INBOUND_HEADER: global_request_id}
if global_request_id else {})
return self._client.get(url, endpoint_filter=self._ks_filter,
microversion=version, headers=headers)
microversion=version, headers=headers,
logger=LOG)
def post(self, url, data, version=None, global_request_id=None):
headers = ({request_id.INBOUND_HEADER: global_request_id}
@ -221,7 +222,7 @@ class SchedulerReportClient(object):
# ecosystem.
return self._client.post(url, endpoint_filter=self._ks_filter,
json=data, microversion=version,
headers=headers)
headers=headers, logger=LOG)
def put(self, url, data, version=None, global_request_id=None):
# NOTE(sdague): using json= instead of data= sets the
@ -234,13 +235,14 @@ class SchedulerReportClient(object):
global_request_id} if global_request_id else {}}
if data is not None:
kwargs['json'] = data
return self._client.put(url, **kwargs)
return self._client.put(url, logger=LOG, **kwargs)
def delete(self, url, version=None, global_request_id=None):
headers = ({request_id.INBOUND_HEADER: global_request_id}
if global_request_id else {})
return self._client.delete(url, endpoint_filter=self._ks_filter,
microversion=version, headers=headers)
microversion=version, headers=headers,
logger=LOG)
def get_allocation_candidates(self, context, resources):
"""Returns a tuple of (allocation_requests, provider_summaries,
@ -2071,7 +2073,7 @@ class SchedulerReportClient(object):
compute node
"""
host = compute_node.hostname
rp_uuid = compute_node.uuid
rp_uuid = compute_node.rp_uuid
if cascade:
# Delete any allocations for this resource provider.
# Since allocations are by consumer, we get the consumers on this

View File

@ -44,10 +44,12 @@ class Scheduler(object):
and not service.disabled]
@abc.abstractmethod
def select_destinations(self, context, containers, extra_spec):
def select_destinations(self, context, containers, extra_specs,
alloc_reqs_by_rp_uuid, provider_summaries,
allocation_request_version=None):
"""Must override select_destinations method.
:return: A list of dicts with 'host', 'nodename' and 'limits' as keys
that satisfies the extra_spec and filter_properties.
that satisfies the extra_specs and filter_properties.
"""
raise NotImplementedError()

View File

@ -15,18 +15,22 @@ The FilterScheduler is for scheduling container to a host according to
your filters configured.
You can customize this scheduler by specifying your own Host Filters.
"""
import random
from oslo_log.log import logging
from zun.common import exception
from zun.common.i18n import _
import zun.conf
from zun import objects
from zun.scheduler.client import report
from zun.scheduler import driver
from zun.scheduler import filters
from zun.scheduler.host_state import HostState
from zun.scheduler import utils
CONF = zun.conf.CONF
LOG = logging.getLogger(__name__)
class FilterScheduler(driver.Scheduler):
@ -40,29 +44,93 @@ class FilterScheduler(driver.Scheduler):
self.filter_cls_map = {cls.__name__: cls for cls in filter_classes}
self.filter_obj_map = {}
self.enabled_filters = self._choose_host_filters(self._load_filters())
self.placement_client = report.SchedulerReportClient()
def _schedule(self, context, container, extra_spec):
def _schedule(self, context, container, extra_specs, alloc_reqs_by_rp_uuid,
provider_summaries, allocation_request_version=None):
"""Picks a host according to filters."""
elevated = context.elevated()
# NOTE(jaypipes): provider_summaries being None is treated differently
# from an empty dict. provider_summaries is None when we want to grab
# all compute nodes.
# The provider_summaries variable will be an empty dict when the
# Placement API found no providers that match the requested
# constraints, which in turn makes compute_uuids an empty list and
# objects.ComputeNode.list will return an empty list
# also, which will eventually result in a NoValidHost error.
compute_uuids = None
if provider_summaries is not None:
compute_uuids = list(provider_summaries.keys())
if compute_uuids is None:
nodes = objects.ComputeNode.list(context)
else:
nodes = objects.ComputeNode.list(
context, filters={'rp_uuid': compute_uuids})
services = self._get_services_by_host(context)
nodes = objects.ComputeNode.list(context)
hosts = services.keys()
nodes = [node for node in nodes if node.hostname in hosts]
host_states = self.get_all_host_state(nodes, services)
hosts = self.filter_handler.get_filtered_objects(self.enabled_filters,
host_states,
container,
extra_spec)
extra_specs)
if not hosts:
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)
return random.choice(hosts)
# Attempt to claim the resources against one or more resource
# providers, looping over the sorted list of possible hosts
# looking for an allocation_request that contains that host's
# resource provider UUID
claimed_host = None
for host in hosts:
cn_uuid = host.uuid
if cn_uuid not in alloc_reqs_by_rp_uuid:
msg = ("A host state with uuid = '%s' that did not have a "
"matching allocation_request was encountered while "
"scheduling. This host was skipped.")
LOG.debug(msg, cn_uuid)
continue
def select_destinations(self, context, containers, extra_spec):
alloc_reqs = alloc_reqs_by_rp_uuid[cn_uuid]
# TODO(jaypipes): Loop through all allocation_requests instead
# of just trying the first one. For now, since we'll likely
# want to order the allocation_requests in the future based on
# information in the provider summaries, we'll just try to
# claim resources using the first allocation_request
alloc_req = alloc_reqs[0]
if utils.claim_resources(
elevated, self.placement_client, container, alloc_req,
allocation_request_version=allocation_request_version):
claimed_host = host
break
if claimed_host is None:
# We weren't able to claim resources in the placement API
# for any of the sorted hosts identified. So, clean up any
# successfully-claimed resources for prior containers in
# this request and return an empty list which will cause
# select_destinations() to raise NoValidHost
msg = _("Unable to successfully claim against any host.")
raise exception.NoValidHost(reason=msg)
# Now consume the resources so the filter/weights will change for
# the next container.
self._consume_selected_host(claimed_host, container)
return claimed_host
def select_destinations(self, context, containers, extra_specs,
alloc_reqs_by_rp_uuid, provider_summaries,
allocation_request_version=None):
"""Selects destinations by filters."""
dests = []
for container in containers:
host = self._schedule(context, container, extra_spec)
host = self._schedule(context, container, extra_specs,
alloc_reqs_by_rp_uuid, provider_summaries,
allocation_request_version)
host_state = dict(host=host.hostname, nodename=None,
limits=host.limits)
dests.append(host_state)
@ -117,3 +185,8 @@ class FilterScheduler(driver.Scheduler):
service=services.get(node.hostname))
host_states.append(host_state)
return host_states
@staticmethod
def _consume_selected_host(selected_host, container):
LOG.debug("Selected host: %(host)s", {'host': selected_host})
selected_host.consume_from_request(container)

View File

@ -10,7 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
from oslo_log.log import logging
from oslo_utils import timeutils
from zun.common import utils
from zun.pci import stats as pci_stats
@ -26,6 +29,8 @@ class HostState(object):
def __init__(self, host):
self.hostname = host
self._lock_name = host
self.uuid = None
# Mutable available resources.
# These will change as resources are virtually "consumed".
@ -47,6 +52,8 @@ class HostState(object):
# Resource oversubscription values for the compute host:
self.limits = {}
self.updated = None
def update(self, compute_node=None, service=None):
"""Update information about a host"""
@utils.synchronized((self.hostname, compute_node))
@ -63,6 +70,11 @@ class HostState(object):
def _update_from_compute_node(self, compute_node):
"""Update information about a host from a Compute object"""
if (self.updated and compute_node.updated_at and
self.updated > compute_node.updated_at):
return
self.uuid = compute_node.rp_uuid
self.mem_available = compute_node.mem_available
self.mem_total = compute_node.mem_total
self.mem_free = compute_node.mem_free
@ -78,6 +90,31 @@ class HostState(object):
self.disk_quota_supported = compute_node.disk_quota_supported
self.runtimes = compute_node.runtimes
self.enable_cpu_pinning = compute_node.enable_cpu_pinning
self.updated = compute_node.updated_at
def consume_from_request(self, container):
"""Incrementally update host state from a Container object."""
@utils.synchronized(self._lock_name)
@set_update_time_on_success
def _locked(self, container):
# Scheduler API is inherently multi-threaded as every incoming RPC
# message will be dispatched in its own green thread. So the
# shared host state should be consumed in a consistent way to make
# sure its data is valid under concurrent write operations.
self._locked_consume_from_request(container)
return _locked(self, container)
def _locked_consume_from_request(self, container):
disk = container.disk if container.disk else 0
ram_mb = int(container.memory) if container.memory else 0
vcpus = container.cpu if container.cpu else 0
self.mem_used += ram_mb
self.disk_used += disk
self.cpu_used += vcpus
self.mem_free = self.mem_total - self.mem_used
# TODO(hongbin): track numa_topology and pci devices
def __repr__(self):
return ("%(host)s ram: %(free_ram)sMB "
@ -86,3 +123,26 @@ class HostState(object):
'free_ram': self.mem_free,
'free_disk': self.disk_total - self.disk_used,
'free_cpu': self.cpus - self.cpu_used})
@utils.expects_func_args('self', 'container')
def set_update_time_on_success(function):
"""Set updated time of HostState when consuming succeed."""
@functools.wraps(function)
def decorated_function(self, container):
return_value = None
try:
return_value = function(self, container)
except Exception as e:
# Ignores exception raised from consume_from_request() so that
# booting container would fail in the resource claim of compute
# node, other suitable node may be chosen during scheduling retry.
LOG.warning("Selected host: %(host)s failed to consume from "
"container. Error: %(error)s",
{'host': self.hostname, 'error': e})
else:
self.updated = timeutils.utcnow()
return return_value
return decorated_function

View File

@ -0,0 +1,46 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from zun.common import consts
import zun.conf
CONF = zun.conf.CONF
LOG = logging.getLogger(__name__)
def compute_status_filter(ctxt, extra_specs):
"""Pre-filter node resource providers using ZUN_COMPUTE_STATUS_DISABLED
The ComputeFilter filters out hosts for compute services that are
disabled. Compute node resource providers managed by a disabled compute
service should have the ZUN_COMPUTE_STATUS_DISABLED trait set and be
excluded by this mandatory pre-filter.
"""
trait_name = consts.ZUN_COMPUTE_STATUS_DISABLED
extra_specs['trait:%s' % trait_name] = 'forbidden'
LOG.debug('compute_status_filter request filter added forbidden '
'trait %s', trait_name)
return True
ALL_REQUEST_FILTERS = [
compute_status_filter,
]
def process_reqspec(ctxt, extra_specs):
"""Process an objects.ReqestSpec before calling placement."""
for filter in ALL_REQUEST_FILTERS:
filter(ctxt, extra_specs)

View File

@ -15,6 +15,7 @@
"""Utility methods for scheduling."""
import collections
import math
import re
import os_resource_classes as orc
@ -26,7 +27,6 @@ from zun import objects
LOG = logging.getLogger(__name__)
CONF = zun.conf.CONF
@ -280,3 +280,133 @@ class ResourceRequest(object):
qparams.extend(to_queryparams(rg, ident or ''))
return parse.urlencode(sorted(qparams))
def resources_from_request_spec(ctxt, container_obj, extra_specs):
"""Given a Container object, returns a ResourceRequest of the resources,
traits, and aggregates it represents.
:param context: The request context.
:param container_obj: A Container object.
:return: A ResourceRequest object.
:raises NoValidHost: If the specified host/node is not found in the DB.
"""
cpu = container_obj.cpu if container_obj.cpu else CONF.default_cpu
# NOTE(hongbin): Container is allowed to take partial core (i.e. 0.1)
# but placement doesn't support it. Therefore, we take the ceil of
# the number.
cpu = int(math.ceil(cpu))
# NOTE(hongbin): If cpu is 0, claim 1 core in placement because placement
# doesn't support cpu as 0.
cpu = cpu if cpu > 1 else 1
memory = int(container_obj.memory) if container_obj.memory else \
CONF.default_memory
# NOTE(hongbin): If memory is 0, claim 1 MB in placement because placement
# doesn't support memory as 0.
memory = memory if memory > 1 else 1
container_resources = {
orc.VCPU: cpu,
orc.MEMORY_MB: memory,
}
if container_obj.disk and container_obj.disk != 0:
container_resources[orc.DISK_GB] = container_obj.disk
# Process extra_specs
if extra_specs:
res_req = ResourceRequest.from_extra_specs(extra_specs)
# If any of the three standard resources above was explicitly given in
# the extra_specs - in any group - we need to replace it, or delete it
# if it was given as zero. We'll do this by grabbing a merged version
# of the ResourceRequest resources and removing matching items from the
# container_resources.
container_resources = {rclass: amt
for rclass, amt in container_resources.items()
if rclass not in res_req.merged_resources()}
# Now we don't need (or want) any remaining zero entries - remove them.
res_req.strip_zeros()
numbered_groups = res_req.get_num_of_numbered_groups()
else:
# Start with an empty one
res_req = ResourceRequest()
numbered_groups = 0
# Add the (remaining) items from the container_resources to the
# sharing group
for rclass, amount in container_resources.items():
res_req.get_request_group(None).resources[rclass] = amount
requested_resources = extra_specs.get('requested_resources', [])
for group in requested_resources:
res_req.add_request_group(group)
# Don't limit allocation candidates when using affinity/anti-affinity.
if (extra_specs.get('hints') and any(
key in ['group', 'same_host', 'different_host']
for key in extra_specs.get('hints'))):
res_req._limit = None
if res_req.get_num_of_numbered_groups() >= 2 and not res_req.group_policy:
LOG.warning(
"There is more than one numbered request group in the "
"allocation candidate query but the container did not specify "
"any group policy. This query would fail in placement due to "
"the missing group policy. If you specified more than one "
"numbered request group in the extra_spec then you need to "
"specify the group policy in the extra_spec. If it is OK "
"to let these groups be satisfied by overlapping resource "
"providers then use 'group_policy': 'none'. If you want each "
"group to be satisfied from a separate resource provider then "
"use 'group_policy': 'isolate'.")
if numbered_groups <= 1:
LOG.info(
"At least one numbered request group is defined outside of "
"the container (e.g. in a port that has a QoS minimum "
"bandwidth policy rule attached) but the flavor did not "
"specify any group policy. To avoid the placement failure "
"nova defaults the group policy to 'none'.")
res_req.group_policy = 'none'
return res_req
def claim_resources(ctx, client, container, alloc_req,
allocation_request_version=None):
"""Given a container and the
allocation_request JSON object returned from Placement, attempt to claim
resources for the container in the placement API. Returns True if the claim
process was successful, False otherwise.
:param ctx: The RequestContext object
:param client: The scheduler client to use for making the claim call
:param container: The consuming container
:param alloc_req: The allocation_request received from placement for the
resources we want to claim against the chosen host. The
allocation_request satisfies the original request for
resources and can be supplied as-is (along with the
project and user ID to the placement API's PUT
/allocations/{consumer_uuid} call to claim resources for
the container
:param allocation_request_version: The microversion used to request the
allocations.
"""
LOG.debug("Attempting to claim resources in the placement API for "
"container %s", container.uuid)
project_id = container.project_id
user_id = container.user_id
container_uuid = container.uuid
# NOTE(gibi): this could raise AllocationUpdateFailed which means there is
# a serious issue with the container_uuid as a consumer. Every caller of
# utils.claim_resources() assumes that container_uuid will be a new
# consumer and therefore we passing None as expected consumer_generation to
# reportclient.claim_resources() here. If the claim fails
# due to consumer generation conflict, which in this case means the
# consumer is not new, then we let the AllocationUpdateFailed propagate and
# fail the build / migrate as the instance is in inconsistent state.
return client.claim_resources(
ctx, container_uuid, alloc_req, project_id, user_id,
allocation_request_version=allocation_request_version,
consumer_generation=None)

View File

@ -14,6 +14,8 @@
# ceilometer/tests/api/__init__.py). This should be oslo'ified:
# https://bugs.launchpad.net/ironic/+bug/1255115.
import mock
# NOTE(deva): import auth_token so we can override a config option
from keystonemiddleware import auth_token # noqa
import pecan
@ -43,6 +45,9 @@ class FunctionalTest(base.DbTestCase):
group='keystone_authtoken')
zun.conf.CONF.set_override("admin_user", "admin",
group='keystone_authtoken')
p = mock.patch('zun.scheduler.client.query.SchedulerClient')
p.start()
self.addCleanup(p.stop)
# Determine where we are so we can set up paths in the config
root_dir = self.get_path()

View File

@ -377,7 +377,7 @@ class TestContainerController(api_base.FunctionalTest):
self.assertIsNotNone(c.get('uuid'))
self.assertIsNotNone(c.get('name'))
self.assertFalse(c.get('command'))
self.assertEqual('2048', c.get('memory'))
self.assertEqual('512', c.get('memory'))
self.assertEqual(1.0, c.get('cpu'))
# TODO(kiennt): Uncomment it when bug [1] be resolved.
# At this time, limit disk size feature will be ready.
@ -418,7 +418,7 @@ class TestContainerController(api_base.FunctionalTest):
self.assertIsNotNone(c.get('uuid'))
self.assertEqual('MyDocker', c.get('name'))
self.assertEqual(["env"], c.get('command'))
self.assertEqual('2048', c.get('memory'))
self.assertEqual('512', c.get('memory'))
self.assertEqual(1.0, c.get('cpu'))
# TODO(kiennt): Uncomment it when bug [1] be resolved.
# At this time, limit disk size feature will be ready.

View File

@ -31,6 +31,10 @@ class TestAPI(base.TestCase):
def setUp(self):
super(TestAPI, self).setUp()
p = mock.patch('zun.scheduler.client.query.SchedulerClient')
p.start()
self.addCleanup(p.stop)
self.compute_api = api.API(self.context)
self.container = objects.Container(
self.context, **utils.get_test_container())

View File

@ -95,6 +95,10 @@ class TestManager(base.TestCase):
def setUp(self):
super(TestManager, self).setUp()
p = mock.patch('zun.scheduler.client.report.SchedulerReportClient')
p.start()
self.addCleanup(p.stop)
zun.conf.CONF.set_override(
'container_driver',
'zun.tests.unit.container.fake_driver.FakeDriver')

View File

@ -70,6 +70,7 @@ class DbPciDeviceTestCase(base.DbTestCase, base.ModelsObjectComparatorMixin):
self._compute_node = dbapi.create_compute_node(
self.admin_context,
{'uuid': uuidsentinel.compute_node,
'rp_uuid': uuidsentinel.compute_node,
'hostname': 'fake_compute_node',
'mem_total': 40960,
'mem_free': 20480,

View File

@ -361,6 +361,8 @@ def get_test_numa_topology(**kwargs):
def get_test_compute_node(**kwargs):
return {
'uuid': kwargs.get('uuid', '24a5b17a-f2eb-4556-89db-5f4169d13982'),
'rp_uuid': kwargs.get('rp_uuid',
'24a5b17a-f2eb-4556-89db-5f4169d13982'),
'hostname': kwargs.get('hostname', 'localhost'),
'numa_topology': kwargs.get('numa_topology', get_test_numa_topology()),
'mem_total': kwargs.get('mem_total', 1024),

View File

@ -359,7 +359,7 @@ object_data = {
'ResourceProvider': '1.0-92b427359d5a4cf9ec6c72cbe630ee24',
'ZunService': '1.2-deff2a74a9ce23baa231ae12f39a6189',
'PciDevice': '1.1-6e3f0851ad1cf12583e6af4df1883979',
'ComputeNode': '1.13-3c122f455c38d3665d327c05d2df6617',
'ComputeNode': '1.14-5cf09346721129068d1f72482309276f',
'PciDevicePool': '1.0-3f5ddc3ff7bfa14da7f6c7e9904cc000',
'PciDevicePoolList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'Quota': '1.3-fcaaaf4b6e983207edba27a1cf8e51ab',

View File

@ -10,13 +10,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import mock
from oslo_config import cfg
from zun import objects
from zun.scheduler.client import query as scheduler_client
from zun.scheduler import filter_scheduler
from zun.tests import base
from zun.tests.unit.db import utils
from zun.tests.unit.scheduler import fakes
@ -27,6 +30,12 @@ class SchedulerClientTestCase(base.TestCase):
def setUp(self):
super(SchedulerClientTestCase, self).setUp()
self.mock_placement_client = mock.Mock()
p = mock.patch('zun.scheduler.client.report.SchedulerReportClient',
return_value=self.mock_placement_client)
p.start()
self.addCleanup(p.stop)
self.client_cls = scheduler_client.SchedulerClient
self.client = self.client_cls()
@ -42,6 +51,26 @@ class SchedulerClientTestCase(base.TestCase):
@mock.patch('zun.scheduler.filter_scheduler.FilterScheduler'
'.select_destinations')
def test_select_destinations(self, mock_select_destinations):
fake_args = ['ctxt', 'fake_containers', 'fake_extra_spec']
mock_alloc_req = {
"allocations": {
mock.sentinel.rp_uuid: [mock.sentinel.alloc_req]
}
}
mock_provider_summaries = {
mock.sentinel.rp_uuid: {}
}
self.mock_placement_client.get_allocation_candidates.return_value = (
[mock_alloc_req], mock_provider_summaries,
mock.sentinel.alloc_request_version
)
alloc_reqs_by_rp_uuid = collections.defaultdict(list)
alloc_reqs_by_rp_uuid[mock.sentinel.rp_uuid] = [mock_alloc_req]
containers = [objects.Container(self.context,
**utils.get_test_container())]
extra_spec = {}
fake_args = ['ctxt', containers, extra_spec]
self.client.select_destinations(*fake_args)
mock_select_destinations.assert_called_once_with(*fake_args)
mock_select_destinations.assert_called_once_with(
'ctxt', containers, extra_spec, alloc_reqs_by_rp_uuid,
mock_provider_summaries, mock.sentinel.alloc_request_version)

View File

@ -298,6 +298,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.12', json=expected_payload,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertTrue(res)
@ -343,6 +344,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.12', json=expected_payload,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertTrue(res)
@ -410,6 +412,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# We have to pull the json body from the mock call_args to validate
# it separately otherwise hash seed issues get in the way.
@ -491,6 +494,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# We have to pull the json body from the mock call_args to validate
# it separately otherwise hash seed issues get in the way.
@ -579,6 +583,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# We have to pull the json body from the mock call_args to validate
# it separately otherwise hash seed issues get in the way.
@ -679,6 +684,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# We have to pull the json body from the mock call_args to validate
# it separately otherwise hash seed issues get in the way.
@ -770,6 +776,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# We have to pull the json body from the mock call_args to validate
# it separately otherwise hash seed issues get in the way.
@ -874,6 +881,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# We have to pull the json body from the mock call_args to validate
# it separately otherwise hash seed issues get in the way.
@ -931,6 +939,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
expected_calls = [
mock.call(expected_url, microversion='1.28', json=expected_payload,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id':
self.context.global_id})] * 2
self.assertEqual(len(expected_calls),
@ -984,6 +993,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=expected_payload,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertFalse(res)
@ -1033,6 +1043,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=expected_payload,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
def test_remove_provider_from_inst_alloc_no_shared(self):
@ -1105,6 +1116,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertTrue(res)
@ -1192,6 +1204,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertTrue(res)
@ -1376,6 +1389,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
mock.call(
'/allocations/%s' % consumer_uuid,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers=mock.ANY,
microversion='1.28'
),
@ -1383,6 +1397,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
'/resource_providers?in_tree=%s' % uuids.source_compute,
headers=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
microversion='1.14'
)
],
@ -1395,6 +1410,7 @@ class TestPutAllocations(SchedulerReportClientTestCase):
self.ks_adap_mock.put.assert_called_once_with(
expected_url, microversion='1.28', json=mock.ANY,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertTrue(res)
@ -1988,6 +2004,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_query)
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.31', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertEqual(mock.sentinel.alloc_reqs, alloc_reqs)
self.assertEqual(mock.sentinel.p_sums, p_sums)
@ -2028,6 +2045,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
self.assertEqual(mock.sentinel.alloc_reqs, alloc_reqs)
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.31', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertEqual(mock.sentinel.p_sums, p_sums)
@ -2050,6 +2068,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
self.ks_adap_mock.get.assert_called_once_with(
mock.ANY, microversion='1.31', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
url = self.ks_adap_mock.get.call_args[0][0]
split_url = parse.urlsplit(url)
@ -2083,6 +2102,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_url = '/resource_providers/' + uuid
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.14', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertEqual(expected_provider_dict, result)
@ -2098,6 +2118,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_url = '/resource_providers/' + uuid
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.14', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertIsNone(result)
@ -2119,6 +2140,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_url = '/resource_providers/' + uuid
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.14', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# A 503 Service Unavailable should trigger an error log that
# includes the placement request id and return None
@ -2158,6 +2180,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
'&required=MISC_SHARES_VIA_AGGREGATE')
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.18', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertEqual(rpjson, result)
@ -2184,6 +2207,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
'&required=MISC_SHARES_VIA_AGGREGATE')
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.18', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# A 503 Service Unavailable should trigger an error log that
# includes the placement request id
@ -2219,6 +2243,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_url = '/resource_providers?in_tree=' + root
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.14', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertEqual(rpjson, result)
@ -2239,6 +2264,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_url = '/resource_providers?in_tree=' + uuid
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.14', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# A 503 Service Unavailable should trigger an error log that includes
# the placement request id
@ -2273,7 +2299,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_url = '/resource_providers'
self.ks_adap_mock.post.assert_called_once_with(
expected_url, json=expected_payload, microversion='1.20',
endpoint_filter=mock.ANY,
endpoint_filter=mock.ANY, logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
def test_create_resource_provider_with_parent(self):
@ -2304,7 +2330,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_url = '/resource_providers'
self.ks_adap_mock.post.assert_called_once_with(
expected_url, json=expected_payload, microversion='1.20',
endpoint_filter=mock.ANY,
endpoint_filter=mock.ANY, logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
@mock.patch.object(report.LOG, 'info')
@ -2335,7 +2361,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_url = '/resource_providers'
self.ks_adap_mock.post.assert_called_once_with(
expected_url, json=expected_payload, microversion='1.20',
endpoint_filter=mock.ANY,
endpoint_filter=mock.ANY, logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertEqual(mock.sentinel.get_rp, result)
# The 409 response will produce a message to the info log.
@ -2376,7 +2402,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
expected_url = '/resource_providers'
self.ks_adap_mock.post.assert_called_once_with(
expected_url, json=expected_payload, microversion='1.20',
endpoint_filter=mock.ANY,
endpoint_filter=mock.ANY, logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# A 503 Service Unavailable should log an error that
# includes the placement request id and
@ -2393,7 +2419,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
self.client.put(url, [])
self.ks_adap_mock.put.assert_called_once_with(
url, json=[], microversion=None, endpoint_filter=mock.ANY,
headers={})
logger=mock.ANY, headers={})
def test_delete_provider(self):
delete_mock = fake_requests.FakeResponse(None)
@ -2411,6 +2437,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
self.ks_adap_mock.delete.assert_called_once_with(
'/resource_providers/' + uuids.root,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': 'gri'}, microversion=None)
self.assertFalse(self.client._provider_tree.exists(uuids.root))
self.assertNotIn(uuids.root, self.client._association_refresh_time)
@ -2428,7 +2455,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
self.assertRaises(exc, self.client._delete_provider, uuids.root)
self.ks_adap_mock.delete.assert_called_once_with(
'/resource_providers/' + uuids.root, microversion=None,
endpoint_filter=mock.ANY, headers={})
endpoint_filter=mock.ANY, logger=mock.ANY, headers={})
self.ks_adap_mock.delete.reset_mock()
@ -2450,7 +2477,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
'resource_provider_generation': 0}
self.ks_adap_mock.put.assert_called_once_with(
'/resource_providers/%s/aggregates' % uuids.rp, json=exp_payload,
microversion='1.19', endpoint_filter=mock.ANY,
microversion='1.19', endpoint_filter=mock.ANY, logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# Cache was updated
ptree_data = self.client._provider_tree.data(uuids.rp)
@ -2512,7 +2539,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
'resource_provider_generation': 4}
self.ks_adap_mock.put.assert_called_once_with(
'/resource_providers/%s/aggregates' % uuids.rp, json=exp_payload,
microversion='1.19', endpoint_filter=mock.ANY,
microversion='1.19', endpoint_filter=mock.ANY, logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
# Cache was updated
ptree_data = self.client._provider_tree.data(uuids.rp)
@ -2591,6 +2618,7 @@ class TestAggregates(SchedulerReportClientTestCase):
expected_url = '/resource_providers/' + uuid + '/aggregates'
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.19', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertEqual(set(aggs), result)
self.assertEqual(42, gen)
@ -2614,6 +2642,7 @@ class TestAggregates(SchedulerReportClientTestCase):
expected_url = '/resource_providers/' + uuid + '/aggregates'
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.19', endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertTrue(log_mock.called)
self.assertEqual(uuids.request_id,
@ -2642,6 +2671,7 @@ class TestTraits(SchedulerReportClientTestCase):
self.ks_adap_mock.get.assert_called_once_with(
expected_url,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
self.assertEqual(set(traits), result)
@ -2667,6 +2697,7 @@ class TestTraits(SchedulerReportClientTestCase):
self.ks_adap_mock.get.assert_called_once_with(
expected_url,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
self.assertTrue(log_mock.called)
@ -2685,6 +2716,7 @@ class TestTraits(SchedulerReportClientTestCase):
self.ks_adap_mock.get.assert_called_once_with(
expected_url,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
@ -2705,11 +2737,13 @@ class TestTraits(SchedulerReportClientTestCase):
self.ks_adap_mock.get.assert_called_once_with(
'/traits?name=in:' + ','.join(all_traits),
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
self.ks_adap_mock.put.assert_has_calls(
[mock.call('/traits/' + trait,
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={
'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
@ -2723,6 +2757,7 @@ class TestTraits(SchedulerReportClientTestCase):
self.ks_adap_mock.get.assert_called_once_with(
'/traits?name=in:' + ','.join(standard_traits),
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
self.ks_adap_mock.put.assert_not_called()
@ -2745,6 +2780,7 @@ class TestTraits(SchedulerReportClientTestCase):
self.ks_adap_mock.get.assert_called_once_with(
'/traits?name=in:FOO',
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
self.ks_adap_mock.put.assert_not_called()
@ -2762,11 +2798,13 @@ class TestTraits(SchedulerReportClientTestCase):
self.ks_adap_mock.get.assert_called_once_with(
'/traits?name=in:FOO',
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
self.ks_adap_mock.put.assert_called_once_with(
'/traits/FOO',
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
@ -2794,11 +2832,13 @@ class TestTraits(SchedulerReportClientTestCase):
self.ks_adap_mock.get.assert_called_once_with(
'/traits?name=in:' + ','.join(traits),
endpoint_filter=mock.ANY,
logger=mock.ANY,
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
self.ks_adap_mock.put.assert_called_once_with(
'/resource_providers/%s/traits' % uuids.rp,
endpoint_filter=mock.ANY,
logger=mock.ANY,
json={'traits': traits, 'resource_provider_generation': 0},
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
@ -3028,6 +3068,7 @@ class TestAllocations(SchedulerReportClientTestCase):
self, mock_by_host, mock_del_alloc, mock_delete):
self.client._provider_tree.new_root(uuids.cn, uuids.cn, generation=1)
cn = utils.get_test_compute_node(self.context, uuid=uuids.cn,
rp_uuid=uuids.cn,
hostname="fake_hostname")
cont1 = utils.get_test_container(self.context, uuid=uuids.inst1)
cont2 = utils.get_test_container(self.context, uuid=uuids.inst2)
@ -3052,6 +3093,7 @@ class TestAllocations(SchedulerReportClientTestCase):
self.client._provider_tree.new_root(uuids.cn, uuids.cn, generation=1)
self.client._association_refresh_time[uuids.cn] = mock.Mock()
cn = utils.get_test_compute_node(self.context, uuid=uuids.cn,
rp_uuid=uuids.cn,
hostname="fake_hostname")
cont1 = utils.get_test_container(self.context, uuid=uuids.inst1)
cont2 = utils.get_test_container(self.context, uuid=uuids.inst2)

View File

@ -12,6 +12,8 @@
import mock
from oslo_utils import timeutils
from zun.api import servicegroup
from zun.common import context
from zun.common import exception
@ -29,6 +31,11 @@ class FilterSchedulerTestCase(base.TestCase):
def setUp(self):
super(FilterSchedulerTestCase, self).setUp()
self.mock_placement_client = mock.Mock()
p = mock.patch('zun.scheduler.client.report.SchedulerReportClient',
return_value=self.mock_placement_client)
p.start()
self.addCleanup(p.stop)
self.context = context.RequestContext('fake_user', 'fake_project')
self.driver = self.driver_cls()
@ -72,6 +79,8 @@ class FilterSchedulerTestCase(base.TestCase):
test_container = utils.get_test_container()
containers = [objects.Container(self.context, **test_container)]
node1 = objects.ComputeNode(self.context)
node1.rp_uuid = mock.sentinel.node1_rp_uuid
node1.updated_at = timeutils.utcnow()
node1.cpus = 48
node1.cpu_used = 0.0
node1.mem_total = 1024 * 128
@ -88,6 +97,8 @@ class FilterSchedulerTestCase(base.TestCase):
node1.runtimes = ['runc']
node1.enable_cpu_pinning = False
node2 = objects.ComputeNode(self.context)
node2.rp_uuid = mock.sentinel.node2_rp_uuid
node2.updated_at = timeutils.utcnow()
node2.cpus = 48
node2.cpu_used = 0.0
node2.mem_total = 1024 * 128
@ -104,6 +115,8 @@ class FilterSchedulerTestCase(base.TestCase):
node2.runtimes = ['runc']
node2.enable_cpu_pinning = False
node3 = objects.ComputeNode(self.context)
node3.rp_uuid = mock.sentinel.node3_rp_uuid
node3.updated_at = timeutils.utcnow()
node3.cpus = 48
node3.cpu_used = 0.0
node3.mem_total = 1024 * 128
@ -120,6 +133,8 @@ class FilterSchedulerTestCase(base.TestCase):
node3.runtimes = ['runc']
node3.enable_cpu_pinning = False
node4 = objects.ComputeNode(self.context)
node4.rp_uuid = mock.sentinel.node4_rp_uuid
node4.updated_at = timeutils.utcnow()
node4.cpus = 48
node4.cpu_used = 0.0
node4.mem_total = 1024 * 128
@ -138,18 +153,30 @@ class FilterSchedulerTestCase(base.TestCase):
nodes = [node1, node2, node3, node4]
mock_compute_list.return_value = nodes
def side_effect(hosts):
return hosts[2]
mock_random_choice.side_effect = side_effect
mock_service_is_up.return_value = True
extra_spec = {}
dests = self.driver.select_destinations(self.context, containers,
extra_spec)
mock_alloc_reqs_by_rp_uuid = {
node3.rp_uuid: [mock.sentinel.node3_alloc_req]
}
mock_provider_summaries = {
node3.rp_uuid: {}
}
dests = self.driver.select_destinations(
self.context, containers, extra_spec,
mock_alloc_reqs_by_rp_uuid, mock_provider_summaries,
mock.sentinel.alloc_request_version)
self.assertEqual(1, len(dests))
(host, node) = (dests[0]['host'], dests[0]['nodename'])
self.assertEqual('host3', host)
self.assertIsNone(node)
container = containers[0]
self.mock_placement_client.claim_resources.assert_called_once_with(
mock.ANY, container.uuid, mock.sentinel.node3_alloc_req,
container.project_id, container.user_id,
allocation_request_version=mock.sentinel.alloc_request_version,
consumer_generation=None)
@mock.patch.object(objects.ComputeNode, 'list')
@mock.patch.object(objects.ZunService, 'list_by_binary')
@ -167,6 +194,10 @@ class FilterSchedulerTestCase(base.TestCase):
test_container = utils.get_test_container()
containers = [objects.Container(self.context, **test_container)]
extra_spec = {}
mock_alloc_reqs_by_rp_uuid = {}
mock_provider_summaries = {}
self.assertRaises(exception.NoValidHost,
self.driver.select_destinations, self.context,
containers, extra_spec)
containers, extra_spec, mock_alloc_reqs_by_rp_uuid,
mock_provider_summaries,
mock.sentinel.alloc_request_version)