Merge "Add 'hw:cpu_thread_policy=require' scheduling"

This commit is contained in:
Jenkins 2016-01-13 02:57:09 +00:00 committed by Gerrit Code Review
commit 34cf19c8a4
5 changed files with 262 additions and 59 deletions

View File

@ -13,6 +13,7 @@
from oslo_log import log as logging
from nova import objects
from nova.objects import fields
from nova.scheduler import filters
from nova.virt import hardware
@ -22,15 +23,57 @@ LOG = logging.getLogger(__name__)
class NUMATopologyFilter(filters.BaseHostFilter):
"""Filter on requested NUMA topology."""
def _satisfies_cpu_policy(self, host_state, extra_specs, image_props):
"""Check that the host_state provided satisfies any available
CPU policy requirements.
"""
host_topology, _ = hardware.host_topology_and_format_from_host(
host_state)
# NOTE(stephenfin): There can be conflicts between the policy
# specified by the image and that specified by the instance, but this
# is not the place to resolve these. We do this during scheduling.
cpu_policy = [extra_specs.get('hw:cpu_policy'),
image_props.get('hw_cpu_policy')]
cpu_thread_policy = [extra_specs.get('hw:cpu_thread_policy'),
image_props.get('hw_cpu_thread_policy')]
if not host_topology:
return True
if fields.CPUAllocationPolicy.DEDICATED not in cpu_policy:
return True
if fields.CPUThreadAllocationPolicy.REQUIRE not in cpu_thread_policy:
return True
for cell in host_topology.cells:
# the presence of siblings indicates hyperthreading (HT)
if not cell.siblings:
LOG.debug("%(host_state)s fails CPU policy requirements. "
"Host does not have hyperthreading or "
"hyperthreading is disabled, but 'require' threads "
"policy was requested.", {'host_state': host_state})
return False
return True
def host_passes(self, host_state, spec_obj):
ram_ratio = host_state.ram_allocation_ratio
cpu_ratio = host_state.cpu_allocation_ratio
extra_specs = spec_obj.flavor.extra_specs
image_props = spec_obj.image.properties
requested_topology = spec_obj.numa_topology
host_topology, _fmt = hardware.host_topology_and_format_from_host(
host_state)
pci_requests = spec_obj.pci_requests
if pci_requests:
pci_requests = pci_requests.requests
if not self._satisfies_cpu_policy(host_state, extra_specs,
image_props):
return False
if requested_topology and host_topology:
limits = objects.NUMATopologyLimits(
cpu_allocation_ratio=cpu_ratio,

View File

@ -32,6 +32,17 @@ NUMA_TOPOLOGY = objects.NUMATopology(
cpu_usage=0, memory_usage=0, mempages=[],
siblings=[], pinned_cpus=set([]))])
NUMA_TOPOLOGY_W_HT = objects.NUMATopology(cells=[
objects.NUMACell(
id=0, cpuset=set([1, 2, 5, 6]), memory=512,
cpu_usage=0, memory_usage=0, mempages=[],
siblings=[set([1, 5]), set([2, 6])], pinned_cpus=set([])),
objects.NUMACell(
id=1, cpuset=set([3, 4, 7, 8]), memory=512,
cpu_usage=0, memory_usage=0, mempages=[],
siblings=[set([3, 4]), set([7, 8])], pinned_cpus=set([]))
])
COMPUTE_NODES = [
objects.ComputeNode(
id=1, local_gb=1024, memory_mb=1024, vcpus=1,

View File

@ -10,9 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import uuid
from nova import objects
from nova.objects import fields
from nova.scheduler.filters import numa_topology_filter
from nova import test
from nova.tests.unit.scheduler import fakes
@ -24,14 +26,21 @@ class TestNUMATopologyFilter(test.NoDBTestCase):
super(TestNUMATopologyFilter, self).setUp()
self.filt_cls = numa_topology_filter.NUMATopologyFilter()
def _get_spec_obj(self, numa_topology):
image_meta = objects.ImageMeta(properties=objects.ImageMetaProps())
spec_obj = objects.RequestSpec(numa_topology=numa_topology,
pci_requests=None,
instance_uuid=str(uuid.uuid4()),
flavor=objects.Flavor(extra_specs={}),
image=image_meta)
return spec_obj
def test_numa_topology_filter_pass(self):
instance_topology = objects.InstanceNUMATopology(
cells=[objects.InstanceNUMACell(id=0, cpuset=set([1]), memory=512),
objects.InstanceNUMACell(id=1, cpuset=set([3]), memory=512)
])
spec_obj = objects.RequestSpec(numa_topology=instance_topology,
pci_requests=None,
instance_uuid=str(uuid.uuid4()))
spec_obj = self._get_spec_obj(numa_topology=instance_topology)
host = fakes.FakeHostState('host1', 'node1',
{'numa_topology': fakes.NUMA_TOPOLOGY,
'pci_stats': None,
@ -45,16 +54,12 @@ class TestNUMATopologyFilter(test.NoDBTestCase):
objects.InstanceNUMACell(id=1, cpuset=set([3]), memory=512)
])
spec_obj = objects.RequestSpec(numa_topology=instance_topology,
pci_requests=None,
instance_uuid=str(uuid.uuid4()))
spec_obj = self._get_spec_obj(numa_topology=instance_topology)
host = fakes.FakeHostState('host1', 'node1', {'pci_stats': None})
self.assertFalse(self.filt_cls.host_passes(host, spec_obj))
def test_numa_topology_filter_numa_host_no_numa_instance_pass(self):
spec_obj = objects.RequestSpec(numa_topology=None,
pci_requests=None,
instance_uuid=str(uuid.uuid4()))
spec_obj = self._get_spec_obj(numa_topology=None)
host = fakes.FakeHostState('host1', 'node1',
{'numa_topology': fakes.NUMA_TOPOLOGY})
self.assertTrue(self.filt_cls.host_passes(host, spec_obj))
@ -65,9 +70,7 @@ class TestNUMATopologyFilter(test.NoDBTestCase):
objects.InstanceNUMACell(id=1, cpuset=set([2]), memory=512),
objects.InstanceNUMACell(id=2, cpuset=set([3]), memory=512)
])
spec_obj = objects.RequestSpec(numa_topology=instance_topology,
pci_requests=None,
instance_uuid=str(uuid.uuid4()))
spec_obj = self._get_spec_obj(numa_topology=instance_topology)
host = fakes.FakeHostState('host1', 'node1',
{'numa_topology': fakes.NUMA_TOPOLOGY,
'pci_stats': None,
@ -81,9 +84,7 @@ class TestNUMATopologyFilter(test.NoDBTestCase):
memory=1024),
objects.InstanceNUMACell(id=1, cpuset=set([3]), memory=512)
])
spec_obj = objects.RequestSpec(numa_topology=instance_topology,
pci_requests=None,
instance_uuid=str(uuid.uuid4()))
spec_obj = self._get_spec_obj(numa_topology=instance_topology)
host = fakes.FakeHostState('host1', 'node1',
{'numa_topology': fakes.NUMA_TOPOLOGY,
'pci_stats': None,
@ -96,9 +97,7 @@ class TestNUMATopologyFilter(test.NoDBTestCase):
cells=[objects.InstanceNUMACell(id=0, cpuset=set([1]), memory=512),
objects.InstanceNUMACell(id=1, cpuset=set([3, 4, 5]),
memory=512)])
spec_obj = objects.RequestSpec(numa_topology=instance_topology,
pci_requests=None,
instance_uuid=str(uuid.uuid4()))
spec_obj = self._get_spec_obj(numa_topology=instance_topology)
host = fakes.FakeHostState('host1', 'node1',
{'numa_topology': fakes.NUMA_TOPOLOGY,
'pci_stats': None,
@ -111,9 +110,7 @@ class TestNUMATopologyFilter(test.NoDBTestCase):
cells=[objects.InstanceNUMACell(id=0, cpuset=set([1]), memory=512),
objects.InstanceNUMACell(id=1, cpuset=set([3]), memory=512)
])
spec_obj = objects.RequestSpec(numa_topology=instance_topology,
pci_requests=None,
instance_uuid=str(uuid.uuid4()))
spec_obj = self._get_spec_obj(numa_topology=instance_topology)
host = fakes.FakeHostState('host1', 'node1',
{'numa_topology': fakes.NUMA_TOPOLOGY,
'pci_stats': None,
@ -123,3 +120,76 @@ class TestNUMATopologyFilter(test.NoDBTestCase):
limits = host.limits['numa_topology']
self.assertEqual(limits.cpu_allocation_ratio, 21)
self.assertEqual(limits.ram_allocation_ratio, 1.3)
def _do_test_numa_topology_filter_cpu_policy(
self, numa_topology, cpu_policy, cpu_thread_policy, passes):
instance_topology = objects.InstanceNUMATopology(
cells=[objects.InstanceNUMACell(id=0, cpuset=set([1]), memory=512),
objects.InstanceNUMACell(id=1, cpuset=set([3]), memory=512)
])
spec_obj = objects.RequestSpec(numa_topology=instance_topology,
pci_requests=None,
instance_uuid=str(uuid.uuid4()))
extra_specs = [
{},
{
'hw:cpu_policy': cpu_policy,
'hw:cpu_thread_policy': cpu_thread_policy,
}
]
image_props = [
{},
{
'hw_cpu_policy': cpu_policy,
'hw_cpu_thread_policy': cpu_thread_policy,
}
]
host = fakes.FakeHostState('host1', 'node1', {
'numa_topology': numa_topology,
'pci_stats': None,
'cpu_allocation_ratio': 1,
'ram_allocation_ratio': 1.5})
assertion = self.assertTrue if passes else self.assertFalse
# test combinations of image properties and extra specs
for specs, props in itertools.product(extra_specs, image_props):
# ...except for the one where no policy is specified
if specs == props == {}:
continue
fake_flavor = objects.Flavor(memory_mb=1024, extra_specs=specs)
fake_image_props = objects.ImageMetaProps(**props)
fake_image = objects.ImageMeta(properties=fake_image_props)
spec_obj.image = fake_image
spec_obj.flavor = fake_flavor
assertion(self.filt_cls.host_passes(host, spec_obj))
def test_numa_topology_filter_fail_cpu_thread_policy_require(self):
cpu_policy = fields.CPUAllocationPolicy.DEDICATED
cpu_thread_policy = fields.CPUThreadAllocationPolicy.REQUIRE
numa_topology = fakes.NUMA_TOPOLOGY
self._do_test_numa_topology_filter_cpu_policy(
numa_topology, cpu_policy, cpu_thread_policy, False)
def test_numa_topology_filter_pass_cpu_thread_policy_require(self):
cpu_policy = fields.CPUAllocationPolicy.DEDICATED
cpu_thread_policy = fields.CPUThreadAllocationPolicy.REQUIRE
numa_topology = fakes.NUMA_TOPOLOGY_W_HT
self._do_test_numa_topology_filter_cpu_policy(
numa_topology, cpu_policy, cpu_thread_policy, True)
def test_numa_topology_filter_pass_cpu_thread_policy_others(self):
cpu_policy = fields.CPUAllocationPolicy.DEDICATED
cpu_thread_policy = fields.CPUThreadAllocationPolicy.PREFER
numa_topology = fakes.NUMA_TOPOLOGY
for cpu_thread_policy in [
fields.CPUThreadAllocationPolicy.PREFER,
fields.CPUThreadAllocationPolicy.ISOLATE]:
self._do_test_numa_topology_filter_cpu_policy(
numa_topology, cpu_policy, cpu_thread_policy, True)

View File

@ -2103,6 +2103,55 @@ class CPUPinningCellTestCase(test.NoDBTestCase, _CPUPinningTestCaseBase):
got_pinning = {x: x for x in range(0, 4)}
self.assertEqual(got_pinning, inst_pin.cpu_pinning)
def test_get_pinning_require_policy_too_few_siblings(self):
host_pin = objects.NUMACell(
id=0,
cpuset=set([0, 1, 2, 3, 4, 5, 6, 7]),
memory=4096, memory_usage=0,
pinned_cpus=set([0, 1, 2]),
siblings=[set([0, 4]), set([1, 5]), set([2, 6]), set([3, 7])],
mempages=[])
inst_pin = objects.InstanceNUMACell(
cpuset=set([0, 1, 2, 3]),
memory=2048,
cpu_policy=fields.CPUAllocationPolicy.DEDICATED,
cpu_thread_policy=fields.CPUThreadAllocationPolicy.REQUIRE)
inst_pin = hw._numa_fit_instance_cell_with_pinning(host_pin, inst_pin)
self.assertIsNone(inst_pin)
def test_get_pinning_require_policy_fits(self):
host_pin = objects.NUMACell(id=0, cpuset=set([0, 1, 2, 3]),
memory=4096, memory_usage=0,
siblings=[set([0, 1]), set([2, 3])],
mempages=[], pinned_cpus=set([]))
inst_pin = objects.InstanceNUMACell(
cpuset=set([0, 1, 2, 3]),
memory=2048,
cpu_policy=fields.CPUAllocationPolicy.DEDICATED,
cpu_thread_policy=fields.CPUThreadAllocationPolicy.REQUIRE)
inst_pin = hw._numa_fit_instance_cell_with_pinning(host_pin, inst_pin)
self.assertInstanceCellPinned(inst_pin)
got_topo = objects.VirtCPUTopology(sockets=1, cores=2, threads=2)
self.assertEqualTopology(got_topo, inst_pin.cpu_topology)
def test_get_pinning_require_policy_fits_w_usage(self):
host_pin = objects.NUMACell(
id=0,
cpuset=set([0, 1, 2, 3, 4, 5, 6, 7]),
memory=4096, memory_usage=0,
pinned_cpus=set([0, 1]),
siblings=[set([0, 4]), set([1, 5]), set([2, 6]), set([3, 7])],
mempages=[])
inst_pin = objects.InstanceNUMACell(
cpuset=set([0, 1, 2, 3]),
memory=2048,
cpu_policy=fields.CPUAllocationPolicy.DEDICATED,
cpu_thread_policy=fields.CPUThreadAllocationPolicy.REQUIRE)
inst_pin = hw._numa_fit_instance_cell_with_pinning(host_pin, inst_pin)
self.assertInstanceCellPinned(inst_pin)
got_topo = objects.VirtCPUTopology(sockets=1, cores=2, threads=2)
self.assertEqualTopology(got_topo, inst_pin.cpu_topology)
def test_get_pinning_host_siblings_instance_odd_fit(self):
host_pin = objects.NUMACell(id=0, cpuset=set([0, 1, 2, 3, 4, 5, 6, 7]),
memory=4096, memory_usage=0,

View File

@ -670,31 +670,27 @@ def _pack_instance_onto_cores(available_siblings, instance_cell, host_cell_id):
topology, making sure that hyperthreads of the instance match up with
those of the host when the pinning takes effect.
Currently the strategy for packing is to prefer siblings and try use
cores evenly, by using emptier cores first. This is achieved by the way we
order cores in the can_pack structure, and the order in which we iterate
Currently the strategy for packing is to prefer siblings and try use cores
evenly, by using emptier cores first. This is achieved by the way we order
cores in the sibling_sets structure, and the order in which we iterate
through it.
The main packing loop that iterates over the can_pack dictionary will not
currently try to look for a fit that maximizes number of siblings, but will
simply rely on the iteration ordering and picking the first viable
The main packing loop that iterates over the sibling_sets dictionary will
not currently try to look for a fit that maximizes number of siblings, but
will simply rely on the iteration ordering and picking the first viable
placement.
"""
# We build up a data structure 'can_pack' that answers the question:
# 'Given the number of threads I want to pack, give me a list of all
# the available sibling sets that can accommodate it'
can_pack = collections.defaultdict(list)
# We build up a data structure that answers the question: 'Given the
# number of threads I want to pack, give me a list of all the available
# sibling sets (or groups thereof) that can accommodate it'
sibling_sets = collections.defaultdict(list)
for sib in available_siblings:
for threads_no in range(1, len(sib) + 1):
can_pack[threads_no].append(sib)
sibling_sets[threads_no].append(sib)
def _can_pack_instance_cell(instance_cell, threads_per_core, cores_list):
"""Determines if instance cell can fit an avail set of cores."""
if threads_per_core * len(cores_list) < len(instance_cell):
return False
return True
pinning = None
threads_no = 1
def _orphans(instance_cell, threads_per_core):
"""Number of instance CPUs which will not fill up a host core.
@ -702,7 +698,7 @@ def _pack_instance_onto_cores(available_siblings, instance_cell, host_cell_id):
Best explained by an example: consider set of free host cores as such:
[(0, 1), (3, 5), (6, 7, 8)]
This would be a case of 2 threads_per_core AKA an entry for 2 in the
can_pack structure.
sibling_sets structure.
If we attempt to pack a 5 core instance on it - due to the fact that we
iterate the list in order, we will end up with a single core of the
@ -733,21 +729,55 @@ def _pack_instance_onto_cores(available_siblings, instance_cell, host_cell_id):
return fractions.gcd(threads_per_core, _orphans(instance_cell,
threads_per_core))
# We iterate over the can_pack dict in descending order of cores that
# can be packed - an attempt to get even distribution over time
for cores_per_sib, sib_list in sorted(
(t for t in can_pack.items()), reverse=True):
if _can_pack_instance_cell(instance_cell,
cores_per_sib, sib_list):
sliced_sibs = map(lambda s: list(s)[:cores_per_sib], sib_list)
pinning = zip(sorted(instance_cell.cpuset),
itertools.chain(*sliced_sibs))
if (instance_cell.cpu_thread_policy ==
fields.CPUThreadAllocationPolicy.REQUIRE):
LOG.debug("Requested 'require' thread policy for %d cores",
len(instance_cell))
elif (instance_cell.cpu_thread_policy ==
fields.CPUThreadAllocationPolicy.PREFER):
LOG.debug("Request 'prefer' thread policy for %d cores",
len(instance_cell))
elif (instance_cell.cpu_thread_policy ==
fields.CPUThreadAllocationPolicy.ISOLATE):
LOG.debug("Requested 'isolate' thread policy for %d cores",
len(instance_cell))
raise NotImplementedError("The 'isolate' policy is not supported.")
else:
LOG.debug("User did not specify a thread policy. Using default "
"for %d cores", len(instance_cell))
# NOTE(ndipanov): We iterate over the sibling sets in descending order
# of cores that can be packed. This is an attempt to evenly distribute
# instances among physical cores
for threads_no, sibling_set in sorted(
(t for t in sibling_sets.items()), reverse=True):
if threads_no * len(sibling_set) < len(instance_cell):
continue
usable_cores = map(lambda s: list(s)[:threads_no], sibling_set)
threads_no = _threads(instance_cell, threads_no)
# NOTE(sfinucan): The key difference between the require and
# prefer policies is that require will not settle for non-siblings
# if this is all that is available. Enforce this by ensuring we're
# using sibling sets that contain at least one sibling
if (instance_cell.cpu_thread_policy ==
fields.CPUThreadAllocationPolicy.REQUIRE):
if threads_no <= 1:
continue
pinning = zip(sorted(instance_cell.cpuset),
itertools.chain(*usable_cores))
break
if not pinning:
return
threads = _threads(instance_cell, cores_per_sib)
cores = len(instance_cell) / threads
topology = objects.VirtCPUTopology(sockets=1,
cores=cores,
threads=threads)
cores=len(pinning) / threads_no,
threads=threads_no)
instance_cell.pin_vcpus(*pinning)
instance_cell.cpu_topology = topology
instance_cell.id = host_cell_id
@ -1063,7 +1093,6 @@ def _add_cpu_pinning_constraint(flavor, image_meta, numa_topology):
for cell in numa_topology.cells:
cell.cpu_policy = cpu_policy
cell.cpu_thread_policy = cpu_thread_policy
return numa_topology
else:
single_cell = objects.InstanceNUMACell(
id=0,
@ -1072,6 +1101,7 @@ def _add_cpu_pinning_constraint(flavor, image_meta, numa_topology):
cpu_policy=cpu_policy,
cpu_thread_policy=cpu_thread_policy)
numa_topology = objects.InstanceNUMATopology(cells=[single_cell])
return numa_topology