Merge "Add AllocatedCapacityWeigher"

This commit is contained in:
Jenkins 2013-12-31 19:59:03 +00:00 committed by Gerrit Code Review
commit fd73c5bf3d
12 changed files with 176 additions and 23 deletions

View File

@ -99,6 +99,9 @@ class HostState(object):
# Mutable available resources.
# These will change as resources are virtually "consumed".
self.total_capacity_gb = 0
# capacity has been allocated in cinder POV, which should be
# sum(vol['size'] for vol in vols_on_hosts)
self.allocated_capacity_gb = 0
self.free_capacity_gb = None
self.reserved_percentage = 0
@ -128,6 +131,8 @@ class HostState(object):
self.total_capacity_gb = capability['total_capacity_gb']
self.free_capacity_gb = capability['free_capacity_gb']
self.allocated_capacity_gb = capability.get(
'allocated_capacity_gb', 0)
self.reserved_percentage = capability['reserved_percentage']
self.updated = capability['timestamp']
@ -135,6 +140,7 @@ class HostState(object):
def consume_from_volume(self, volume):
"""Incrementally update host state from an volume."""
volume_gb = volume['size']
self.allocated_capacity_gb += volume_gb
if self.free_capacity_gb == 'infinite':
# There's virtually infinite space on back-end
pass

View File

@ -1,3 +1,4 @@
# Copyright (c) 2013 eBay Inc.
# Copyright (c) 2012 OpenStack Foundation
# All Rights Reserved.
#
@ -13,11 +14,22 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Capacity Weigher. Weigh hosts by their available capacity.
Weighers that weigh hosts by their capacity, including following two
weighers:
1. Capacity Weigher. Weigh hosts by their available capacity.
The default is to spread volumes across all hosts evenly. If you prefer
stacking, you can set the 'capacity_weight_multiplier' option to a negative
number and the weighing has the opposite effect of the default.
2. Allocated Capacity Weigher. Weigh hosts by their allocated capacity.
The default behavior is to place new volume to the host allocated the least
space. This weigher is intended to simulate the behavior of SimpleScheduler.
If you prefer to place volumes to host allocated the most space, you can
set the 'allocated_capacity_weight_multiplier' option to a postive number
and the weighing has the opposite effect of the default.
"""
@ -33,6 +45,10 @@ capacity_weight_opts = [
default=1.0,
help='Multiplier used for weighing volume capacity. '
'Negative numbers mean to stack vs spread.'),
cfg.FloatOpt('allocated_capacity_weight_multiplier',
default=-1.0,
help='Multiplier used for weighing volume capacity. '
'Negative numbers mean to stack vs spread.'),
]
CONF = cfg.CONF
@ -55,3 +71,15 @@ class CapacityWeigher(weights.BaseHostWeigher):
else:
free = math.floor(host_state.free_capacity_gb * (1 - reserved))
return free
class AllocatedCapacityWeigher(weights.BaseHostWeigher):
def _weight_multiplier(self):
"""Override the weight multiplier."""
return CONF.allocated_capacity_weight_multiplier
def _weigh_object(self, host_state, weight_properties):
# Higher weights win. We want spreading (choose host with lowest
# allocated_capacity first) to be the default.
allocated_space = host_state.allocated_capacity_gb
return allocated_space

View File

@ -76,7 +76,7 @@ class AdminActionsTest(test.TestCase):
def test_reset_status_as_non_admin(self):
# current status is 'error'
volume = db.volume_create(context.get_admin_context(),
{'status': 'error'})
{'status': 'error', 'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
@ -96,7 +96,7 @@ class AdminActionsTest(test.TestCase):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available'})
volume = db.volume_create(ctx, {'status': 'available', 'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
@ -115,7 +115,7 @@ class AdminActionsTest(test.TestCase):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available'})
volume = db.volume_create(ctx, {'status': 'available', 'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
@ -153,7 +153,8 @@ class AdminActionsTest(test.TestCase):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available',
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1,
'attach_status': 'attached'})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
@ -177,7 +178,8 @@ class AdminActionsTest(test.TestCase):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available',
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1,
'attach_status': 'detached'})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
@ -200,7 +202,8 @@ class AdminActionsTest(test.TestCase):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# snapshot in 'error_deleting'
volume = db.volume_create(ctx, {})
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
snapshot = db.snapshot_create(ctx, {'status': 'error_deleting',
'volume_id': volume['id']})
req = webob.Request.blank('/v2/fake/snapshots/%s/action' %
@ -222,7 +225,8 @@ class AdminActionsTest(test.TestCase):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# snapshot in 'available'
volume = db.volume_create(ctx, {})
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
snapshot = db.snapshot_create(ctx, {'status': 'available',
'volume_id': volume['id']})
req = webob.Request.blank('/v2/fake/snapshots/%s/action' %
@ -245,7 +249,7 @@ class AdminActionsTest(test.TestCase):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is creating
volume = db.volume_create(ctx, {'status': 'creating'})
volume = db.volume_create(ctx, {'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
@ -263,7 +267,7 @@ class AdminActionsTest(test.TestCase):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is creating
volume = db.volume_create(ctx, {'host': 'test'})
volume = db.volume_create(ctx, {'host': 'test', 'size': 1})
snapshot = db.snapshot_create(ctx, {'status': 'creating',
'volume_size': 1,
'volume_id': volume['id']})
@ -291,7 +295,7 @@ class AdminActionsTest(test.TestCase):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
@ -346,7 +350,7 @@ class AdminActionsTest(test.TestCase):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
@ -402,7 +406,7 @@ class AdminActionsTest(test.TestCase):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
@ -438,7 +442,7 @@ class AdminActionsTest(test.TestCase):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
@ -474,7 +478,7 @@ class AdminActionsTest(test.TestCase):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
@ -489,7 +493,7 @@ class AdminActionsTest(test.TestCase):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
values = {'status': 'attaching',
@ -513,7 +517,7 @@ class AdminActionsTest(test.TestCase):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
values = {'status': 'attaching',

View File

@ -34,18 +34,22 @@ class FakeHostManager(host_manager.HostManager):
self.service_states = {
'host1': {'total_capacity_gb': 1024,
'free_capacity_gb': 1024,
'allocated_capacity_gb': 0,
'reserved_percentage': 10,
'timestamp': None},
'host2': {'total_capacity_gb': 2048,
'free_capacity_gb': 300,
'allocated_capacity_gb': 1748,
'reserved_percentage': 10,
'timestamp': None},
'host3': {'total_capacity_gb': 512,
'free_capacity_gb': 512,
'free_capacity_gb': 256,
'allocated_capacity_gb': 256,
'reserved_percentage': 0,
'timestamp': None},
'host4': {'total_capacity_gb': 2048,
'free_capacity_gb': 200,
'allocated_capacity_gb': 1848,
'reserved_percentage': 5,
'timestamp': None},
}

View File

@ -0,0 +1,92 @@
# Copyright 2013 eBay Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Allocated Capacity Weigher.
"""
import mock
from oslo.config import cfg
from cinder import context
from cinder.openstack.common.scheduler.weights import HostWeightHandler
from cinder.scheduler.weights.capacity import AllocatedCapacityWeigher as ACW
from cinder import test
from cinder.tests.scheduler import fakes
CONF = cfg.CONF
class AllocatedCapacityWeigherTestCase(test.TestCase):
def setUp(self):
super(AllocatedCapacityWeigherTestCase, self).setUp()
self.host_manager = fakes.FakeHostManager()
self.weight_handler = HostWeightHandler('cinder.scheduler.weights')
def _get_weighed_host(self, hosts, weight_properties=None):
if weight_properties is None:
weight_properties = {}
return self.weight_handler.get_weighed_objects([ACW], hosts,
weight_properties)[0]
@mock.patch('cinder.db.sqlalchemy.api.service_get_all_by_topic')
def _get_all_hosts(self, _mock_service_get_all_by_topic):
ctxt = context.get_admin_context()
fakes.mock_host_manager_db_calls(_mock_service_get_all_by_topic)
host_states = self.host_manager.get_all_host_states(ctxt)
_mock_service_get_all_by_topic.assert_called_once_with(
ctxt, CONF.volume_topic)
return host_states
def test_default_of_spreading_first(self):
hostinfo_list = self._get_all_hosts()
# host1: allocated_capacity_gb=0, weight=0
# host2: allocated_capacity_gb=1748, weight=-1748
# host3: allocated_capacity_gb=256, weight=-256
# host4: allocated_capacity_gb=1848, weight=-1848
# so, host1 should win:
weighed_host = self._get_weighed_host(hostinfo_list)
self.assertEqual(weighed_host.weight, 0)
self.assertEqual(weighed_host.obj.host, 'host1')
def test_capacity_weight_multiplier1(self):
self.flags(allocated_capacity_weight_multiplier=1.0)
hostinfo_list = self._get_all_hosts()
# host1: allocated_capacity_gb=0, weight=0
# host2: allocated_capacity_gb=1748, weight=1748
# host3: allocated_capacity_gb=256, weight=256
# host4: allocated_capacity_gb=1848, weight=1848
# so, host4 should win:
weighed_host = self._get_weighed_host(hostinfo_list)
self.assertEqual(weighed_host.weight, 1848.0)
self.assertEqual(weighed_host.obj.host, 'host4')
def test_capacity_weight_multiplier2(self):
self.flags(allocated_capacity_weight_multiplier=-2.0)
hostinfo_list = self._get_all_hosts()
# host1: allocated_capacity_gb=0, weight=0
# host2: allocated_capacity_gb=1748, weight=-3496
# host3: allocated_capacity_gb=256, weight=-512
# host4: allocated_capacity_gb=1848, weight=-3696
# so, host1 should win:
weighed_host = self._get_weighed_host(hostinfo_list)
self.assertEqual(weighed_host.weight, 0)
self.assertEqual(weighed_host.obj.host, 'host1')

View File

@ -56,7 +56,7 @@ class CapacityWeigherTestCase(test.TestCase):
# host1: free_capacity_gb=1024, free=1024*(1-0.1)
# host2: free_capacity_gb=300, free=300*(1-0.1)
# host3: free_capacity_gb=512, free=512
# host3: free_capacity_gb=512, free=256
# host4: free_capacity_gb=200, free=200*(1-0.05)
# so, host1 should win:
@ -70,7 +70,7 @@ class CapacityWeigherTestCase(test.TestCase):
# host1: free_capacity_gb=1024, free=-1024*(1-0.1)
# host2: free_capacity_gb=300, free=-300*(1-0.1)
# host3: free_capacity_gb=512, free=-512
# host3: free_capacity_gb=512, free=-256
# host4: free_capacity_gb=200, free=-200*(1-0.05)
# so, host4 should win:
@ -84,7 +84,7 @@ class CapacityWeigherTestCase(test.TestCase):
# host1: free_capacity_gb=1024, free=1024*(1-0.1)*2
# host2: free_capacity_gb=300, free=300*(1-0.1)*2
# host3: free_capacity_gb=512, free=512*2
# host3: free_capacity_gb=512, free=256*2
# host4: free_capacity_gb=200, free=200*(1-0.05)*2
# so, host1 should win:

View File

@ -89,6 +89,7 @@ class GPFSDriverTestCase(test.TestCase):
self.volume = importutils.import_object(CONF.volume_manager)
self.volume.driver.set_execute(self._execute_wrapper)
self.volume.driver.set_initialized()
self.volume.stats = dict(allocated_capacity_gb=0)
self.stubs.Set(GPFSDriver, '_create_gpfs_snap',
self._fake_gpfs_snap)

View File

@ -701,6 +701,7 @@ class ManagedRBDTestCase(DriverTestCase):
super(ManagedRBDTestCase, self).setUp()
fake_image.stub_out_image_service(self.stubs)
self.volume.driver.set_initialized()
self.volume.stats = {'allocated_capacity_gb': 0}
self.called = []
def _create_volume_from_image(self, expected_status, raw=False,

View File

@ -111,6 +111,7 @@ class BaseVolumeTestCase(test.TestCase):
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.stubs.Set(os.path, 'exists', lambda x: True)
self.volume.driver.set_initialized()
self.volume.stats = {'allocated_capacity_gb': 0}
# keep ordered record of what we execute
self.called = []

View File

@ -191,6 +191,7 @@ class VolumeManager(manager.SchedulerDependentManager):
self.configuration = Configuration(volume_manager_opts,
config_group=service_name)
self._tp = GreenPool()
self.stats = {}
if not volume_driver:
# Get from configuration, which will get the default
@ -241,8 +242,13 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.debug(_("Re-exporting %s volumes"), len(volumes))
try:
sum = 0
self.stats.update({'allocated_capacity_gb': sum})
for volume in volumes:
if volume['status'] in ['available', 'in-use']:
# calculate allocated capacity for driver
sum += volume['size']
self.stats['allocated_capacity_gb'] = sum
self.driver.ensure_export(ctxt, volume)
elif volume['status'] == 'downloading':
LOG.info(_("volume %s stuck in a downloading state"),
@ -339,6 +345,8 @@ class VolumeManager(manager.SchedulerDependentManager):
# Fetch created volume from storage
volume_ref = flow_engine.storage.fetch('volume')
# Update volume stats
self.stats['allocated_capacity_gb'] += volume_ref['size']
return volume_ref['id']
@utils.require_driver_initialized
@ -415,6 +423,7 @@ class VolumeManager(manager.SchedulerDependentManager):
if reservations:
QUOTAS.commit(context, reservations, project_id=project_id)
self.stats['allocated_capacity_gb'] -= volume_ref['size']
self.publish_service_capabilities(context)
return True
@ -923,8 +932,9 @@ class VolumeManager(manager.SchedulerDependentManager):
else:
volume_stats = self.driver.get_volume_stats(refresh=True)
if volume_stats:
# This will grab info about the host and queue it
# to be sent to the Schedulers.
# Append volume stats with 'allocated_capacity_gb'
volume_stats.update(self.stats)
# queue it to be sent to the Schedulers.
self.update_service_capabilities(volume_stats)
def publish_service_capabilities(self, context):
@ -999,6 +1009,7 @@ class VolumeManager(manager.SchedulerDependentManager):
QUOTAS.commit(context, reservations)
self.db.volume_update(context, volume['id'], {'size': int(new_size),
'status': 'available'})
self.stats['allocated_capacity_gb'] += size_increase
self._notify_about_volume_usage(
context, volume, "resize.end",
extra_usage_info={'size': int(new_size)})

View File

@ -996,6 +996,10 @@
# numbers mean to stack vs spread. (floating point value)
#capacity_weight_multiplier=1.0
# Multiplier used for weighing volume capacity. Negative
# numbers mean to stack vs spread. (floating point value)
#allocated_capacity_weight_multiplier=-1.0
#
# Options defined in cinder.transfer.api

View File

@ -46,6 +46,7 @@ cinder.scheduler.filters =
JsonFilter = cinder.openstack.common.scheduler.filters.json_filter:JsonFilter
RetryFilter = cinder.scheduler.filters.retry_filter:RetryFilter
cinder.scheduler.weights =
AllocatedCapacityWeigher = cinder.scheduler.weights.capacity:AllocatedCapacityWeigher
CapacityWeigher = cinder.scheduler.weights.capacity:CapacityWeigher
ChanceWeigher = cinder.scheduler.weights.chance:ChanceWeigher