Initiating Cinder Volume Manager with large number of volumes
This patch adds the option to split retrieved volumes and snapshots by chunks during Volume Manager host initialization. Query results will be obtained in batches from the database and not in one shot to avoid extreme memory usage. Max number of volumes and snapshots per batch is controlled by option init_host_max_objects_retrieval. Option is disabled by default for backward compatibility. To migrate any ConfKeyManager keys based on fixed_key to the currently configured key manager Volume Manager will use special lightweight object - VolumeMigration. Change-Id: I53eccc77fdc2c35b27ed430af62dc18e7d1bde69 Closes-Bug: 1681374
This commit is contained in:
parent
5b37d2aa33
commit
ed02273d2f
@ -320,9 +320,9 @@ def volume_get_all_by_project(context, project_id, marker, limit,
|
||||
offset=offset)
|
||||
|
||||
|
||||
def get_volume_summary(context, project_only):
|
||||
def get_volume_summary(context, project_only, filters=None):
|
||||
"""Get volume summary."""
|
||||
return IMPL.get_volume_summary(context, project_only)
|
||||
return IMPL.get_volume_summary(context, project_only, filters)
|
||||
|
||||
|
||||
def volume_update(context, volume_id, values):
|
||||
@ -528,6 +528,11 @@ def snapshot_get_all_active_by_window(context, begin, end=None,
|
||||
project_id)
|
||||
|
||||
|
||||
def get_snapshot_summary(context, project_only, filters=None):
|
||||
"""Get snapshot summary."""
|
||||
return IMPL.get_snapshot_summary(context, project_only, filters)
|
||||
|
||||
|
||||
####################
|
||||
|
||||
|
||||
|
@ -2222,11 +2222,15 @@ def volume_get_all(context, marker=None, limit=None, sort_keys=None,
|
||||
|
||||
|
||||
@require_context
|
||||
def get_volume_summary(context, project_only):
|
||||
def get_volume_summary(context, project_only, filters=None):
|
||||
"""Retrieves all volumes summary.
|
||||
|
||||
:param context: context to query under
|
||||
:param project_only: limit summary to project volumes
|
||||
:param filters: dictionary of filters; values that are in lists, tuples,
|
||||
or sets cause an 'IN' operation, while exact matching
|
||||
is used for other values, see _process_volume_filters
|
||||
function for more information
|
||||
:returns: volume summary
|
||||
"""
|
||||
if not (project_only or is_admin_context(context)):
|
||||
@ -2236,6 +2240,9 @@ def get_volume_summary(context, project_only):
|
||||
if project_only:
|
||||
query = query.filter_by(project_id=context.project_id)
|
||||
|
||||
if filters:
|
||||
query = _process_volume_filters(query, filters)
|
||||
|
||||
if query is None:
|
||||
return []
|
||||
|
||||
@ -3357,6 +3364,40 @@ def snapshot_update(context, snapshot_id, values):
|
||||
raise exception.SnapshotNotFound(snapshot_id=snapshot_id)
|
||||
|
||||
|
||||
@require_context
|
||||
def get_snapshot_summary(context, project_only, filters=None):
|
||||
"""Retrieves all snapshots summary.
|
||||
|
||||
:param context: context to query under
|
||||
:param project_only: limit summary to snapshots
|
||||
:param filters: dictionary of filters; values that are in lists, tuples,
|
||||
or sets cause an 'IN' operation, while exact matching
|
||||
is used for other values, see _process_snaps_filters
|
||||
function for more information
|
||||
:returns: snapshots summary
|
||||
"""
|
||||
|
||||
if not (project_only or is_admin_context(context)):
|
||||
raise exception.AdminRequired()
|
||||
|
||||
query = model_query(context, func.count(models.Snapshot.id),
|
||||
func.sum(models.Snapshot.volume_size),
|
||||
read_deleted="no")
|
||||
|
||||
if project_only:
|
||||
query = query.filter_by(project_id=context.project_id)
|
||||
|
||||
if filters:
|
||||
query = _process_snaps_filters(query, filters)
|
||||
|
||||
if query is None:
|
||||
return []
|
||||
|
||||
result = query.first()
|
||||
|
||||
return result[0] or 0, result[1] or 0
|
||||
|
||||
|
||||
####################
|
||||
|
||||
|
||||
|
@ -27,6 +27,7 @@ from keystoneauth1 import session as ks_session
|
||||
from cinder import context
|
||||
from cinder import coordination
|
||||
from cinder import objects
|
||||
from cinder.volume import volume_migration as volume_migration
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -145,7 +146,8 @@ class KeyMigrator(object):
|
||||
item.encryption_key_id = encryption_key_id
|
||||
item.save()
|
||||
|
||||
if isinstance(item, objects.volume.Volume):
|
||||
allowTypes = (volume_migration.VolumeMigration, objects.volume.Volume)
|
||||
if isinstance(item, allowTypes):
|
||||
snapshots = objects.snapshot.SnapshotList.get_all_for_volume(
|
||||
self.admin_context,
|
||||
item.id)
|
||||
|
@ -360,3 +360,8 @@ class SnapshotList(base.ObjectListBase, base.CinderObject):
|
||||
expected_attrs = Snapshot._get_expected_attrs(context)
|
||||
return base.obj_make_list(context, cls(context), objects.Snapshot,
|
||||
snapshots, expected_attrs=expected_attrs)
|
||||
|
||||
@classmethod
|
||||
def get_snapshot_summary(cls, context, project_only, filters=None):
|
||||
summary = db.get_snapshot_summary(context, project_only, filters)
|
||||
return summary
|
||||
|
@ -653,8 +653,8 @@ class VolumeList(base.ObjectListBase, base.CinderObject):
|
||||
volumes, expected_attrs=expected_attrs)
|
||||
|
||||
@classmethod
|
||||
def get_volume_summary(cls, context, project_only):
|
||||
volumes = db.get_volume_summary(context, project_only)
|
||||
def get_volume_summary(cls, context, project_only, filters=None):
|
||||
volumes = db.get_volume_summary(context, project_only, filters)
|
||||
return volumes
|
||||
|
||||
@classmethod
|
||||
|
@ -26,6 +26,7 @@ from cinder.tests.unit import utils as tests_utils
|
||||
from cinder.tests.unit import volume as base
|
||||
from cinder.volume import driver
|
||||
from cinder.volume import utils as volutils
|
||||
from cinder.volume import volume_migration as volume_migration
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -81,6 +82,14 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase):
|
||||
self.volume.delete_volume(self.context, vol3)
|
||||
self.volume.delete_volume(self.context, vol4)
|
||||
|
||||
def test_init_host_count_allocated_capacity_batch_retrieval(self):
|
||||
old_val = CONF.init_host_max_objects_retrieval
|
||||
CONF.init_host_max_objects_retrieval = 1
|
||||
try:
|
||||
self.test_init_host_count_allocated_capacity()
|
||||
finally:
|
||||
CONF.init_host_max_objects_retrieval = old_val
|
||||
|
||||
@mock.patch('cinder.manager.CleanableManager.init_host')
|
||||
def test_init_host_count_allocated_capacity_cluster(self, init_host_mock):
|
||||
cluster_name = 'mycluster'
|
||||
@ -161,6 +170,14 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase):
|
||||
self.volume.delete_volume(self.context, vol0)
|
||||
self.volume.delete_volume(self.context, vol1)
|
||||
|
||||
def test_init_host_sync_provider_info_batch_retrieval(self):
|
||||
old_val = CONF.init_host_max_objects_retrieval
|
||||
CONF.init_host_max_objects_retrieval = 1
|
||||
try:
|
||||
self.test_init_host_sync_provider_info()
|
||||
finally:
|
||||
CONF.init_host_max_objects_retrieval = old_val
|
||||
|
||||
@mock.patch.object(driver.BaseVD, "update_provider_info")
|
||||
def test_init_host_sync_provider_info_no_update(self, mock_update):
|
||||
vol0 = tests_utils.create_volume(
|
||||
@ -267,9 +284,11 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase):
|
||||
group_include_mock.assert_called_once_with(mock.ANY, cluster,
|
||||
host=self.volume.host)
|
||||
vol_get_all_mock.assert_called_once_with(
|
||||
mock.ANY, filters={'cluster_name': cluster})
|
||||
mock.ANY, filters={'cluster_name': cluster},
|
||||
limit=None, offset=None)
|
||||
snap_get_all_mock.assert_called_once_with(
|
||||
mock.ANY, filters={'cluster_name': cluster})
|
||||
mock.ANY, filters={'cluster_name': cluster},
|
||||
limit=None, offset=None)
|
||||
|
||||
@mock.patch('cinder.keymgr.migration.migrate_fixed_key')
|
||||
@mock.patch('cinder.volume.manager.VolumeManager._get_my_volumes')
|
||||
@ -280,9 +299,13 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase):
|
||||
mock_migrate_fixed_key):
|
||||
|
||||
self.volume.init_host(service_id=self.service_id)
|
||||
|
||||
volumes = mock_get_my_volumes()
|
||||
volumes_to_migrate = volume_migration.VolumeMigrationList()
|
||||
volumes_to_migrate.append(volumes, self.context)
|
||||
mock_add_threadpool.assert_called_once_with(
|
||||
mock_migrate_fixed_key,
|
||||
volumes=mock_get_my_volumes())
|
||||
volumes=volumes_to_migrate)
|
||||
|
||||
@mock.patch('time.sleep')
|
||||
def test_init_host_retry(self, mock_sleep):
|
||||
|
@ -72,7 +72,8 @@ class ReplicationTestCase(base.BaseVolumeTestCase):
|
||||
|
||||
self.manager.failover_host(self.context, new_backend)
|
||||
mock_getall.assert_called_once_with(self.context,
|
||||
filters={'host': self.host})
|
||||
filters={'host': self.host},
|
||||
limit=None, offset=None)
|
||||
mock_failover.assert_called_once_with(self.context,
|
||||
[],
|
||||
secondary_id=new_backend,
|
||||
|
@ -83,6 +83,7 @@ from cinder.volume.flows.manager import manage_existing_snapshot
|
||||
from cinder.volume import group_types
|
||||
from cinder.volume import rpcapi as volume_rpcapi
|
||||
from cinder.volume import utils as vol_utils
|
||||
from cinder.volume import volume_migration as volume_migration
|
||||
from cinder.volume import volume_types
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -129,6 +130,13 @@ volume_manager_opts = [
|
||||
help='Maximum times to reintialize the driver '
|
||||
'if volume initialization fails. The interval of retry is '
|
||||
'exponentially backoff, and will be 1s, 2s, 4s etc.'),
|
||||
cfg.IntOpt('init_host_max_objects_retrieval',
|
||||
default=0,
|
||||
help='Max number of volumes and snapshots to be retrieved '
|
||||
'per batch during volume manager host initialization. '
|
||||
'Query results will be obtained in batches from the '
|
||||
'database and not in one shot to avoid extreme memory '
|
||||
'usage. Set 0 to turn off this functionality.'),
|
||||
]
|
||||
|
||||
volume_backend_opts = [
|
||||
@ -462,36 +470,83 @@ class VolumeManager(manager.CleanableManager,
|
||||
# Initialize backend capabilities list
|
||||
self.driver.init_capabilities()
|
||||
|
||||
volumes = self._get_my_volumes(ctxt)
|
||||
snapshots = self._get_my_snapshots(ctxt)
|
||||
self._sync_provider_info(ctxt, volumes, snapshots)
|
||||
# FIXME volume count for exporting is wrong
|
||||
|
||||
# Zero stats
|
||||
self.stats['pools'] = {}
|
||||
self.stats.update({'allocated_capacity_gb': 0})
|
||||
|
||||
try:
|
||||
for volume in volumes:
|
||||
# available volume should also be counted into allocated
|
||||
if volume['status'] in ['in-use', 'available']:
|
||||
# calculate allocated capacity for driver
|
||||
self._count_allocated_capacity(ctxt, volume)
|
||||
# Batch retrieval volumes and snapshots
|
||||
|
||||
try:
|
||||
if volume['status'] in ['in-use']:
|
||||
self.driver.ensure_export(ctxt, volume)
|
||||
except Exception:
|
||||
LOG.exception("Failed to re-export volume, "
|
||||
"setting to ERROR.",
|
||||
resource=volume)
|
||||
volume.conditional_update({'status': 'error'},
|
||||
{'status': 'in-use'})
|
||||
# All other cleanups are processed by parent class CleanableManager
|
||||
num_vols, num_snaps, max_objs_num, req_range = None, None, None, [0]
|
||||
req_limit = CONF.init_host_max_objects_retrieval
|
||||
use_batch_objects_retrieval = req_limit > 0
|
||||
|
||||
except Exception:
|
||||
LOG.exception("Error during re-export on driver init.",
|
||||
resource=volume)
|
||||
return
|
||||
if use_batch_objects_retrieval:
|
||||
# Get total number of volumes
|
||||
num_vols, __, __ = self._get_my_volumes_summary(ctxt)
|
||||
# Get total number of snapshots
|
||||
num_snaps, __ = self._get_my_snapshots_summary(ctxt)
|
||||
# Calculate highest number of the objects (volumes or snapshots)
|
||||
max_objs_num = max(num_vols, num_snaps)
|
||||
# Make batch request loop counter
|
||||
req_range = range(0, max_objs_num, req_limit)
|
||||
|
||||
volumes_to_migrate = volume_migration.VolumeMigrationList()
|
||||
|
||||
for req_offset in req_range:
|
||||
|
||||
# Retrieve 'req_limit' number of objects starting from
|
||||
# 'req_offset' position
|
||||
volumes, snapshots = None, None
|
||||
if use_batch_objects_retrieval:
|
||||
if req_offset < num_vols:
|
||||
volumes = self._get_my_volumes(ctxt,
|
||||
limit=req_limit,
|
||||
offset=req_offset)
|
||||
else:
|
||||
volumes = objects.VolumeList()
|
||||
if req_offset < num_snaps:
|
||||
snapshots = self._get_my_snapshots(ctxt,
|
||||
limit=req_limit,
|
||||
offset=req_offset)
|
||||
else:
|
||||
snapshots = objects.SnapshotList()
|
||||
# or retrieve all volumes and snapshots per single request
|
||||
else:
|
||||
volumes = self._get_my_volumes(ctxt)
|
||||
snapshots = self._get_my_snapshots(ctxt)
|
||||
|
||||
self._sync_provider_info(ctxt, volumes, snapshots)
|
||||
# FIXME volume count for exporting is wrong
|
||||
|
||||
try:
|
||||
for volume in volumes:
|
||||
# available volume should also be counted into allocated
|
||||
if volume['status'] in ['in-use', 'available']:
|
||||
# calculate allocated capacity for driver
|
||||
self._count_allocated_capacity(ctxt, volume)
|
||||
|
||||
try:
|
||||
if volume['status'] in ['in-use']:
|
||||
self.driver.ensure_export(ctxt, volume)
|
||||
except Exception:
|
||||
LOG.exception("Failed to re-export volume, "
|
||||
"setting to ERROR.",
|
||||
resource=volume)
|
||||
volume.conditional_update({'status': 'error'},
|
||||
{'status': 'in-use'})
|
||||
# All other cleanups are processed by parent class -
|
||||
# CleanableManager
|
||||
|
||||
except Exception:
|
||||
LOG.exception("Error during re-export on driver init.",
|
||||
resource=volume)
|
||||
return
|
||||
|
||||
if len(volumes):
|
||||
volumes_to_migrate.append(volumes, ctxt)
|
||||
|
||||
del volumes
|
||||
del snapshots
|
||||
|
||||
self.driver.set_throttle()
|
||||
|
||||
@ -507,7 +562,7 @@ class VolumeManager(manager.CleanableManager,
|
||||
# Migrate any ConfKeyManager keys based on fixed_key to the currently
|
||||
# configured key manager.
|
||||
self._add_to_threadpool(key_migration.migrate_fixed_key,
|
||||
volumes=volumes)
|
||||
volumes=volumes_to_migrate)
|
||||
|
||||
# collect and publish service capabilities
|
||||
self.publish_service_capabilities(ctxt)
|
||||
@ -2923,15 +2978,27 @@ class VolumeManager(manager.CleanableManager,
|
||||
filters = {'host': self.host}
|
||||
return filters
|
||||
|
||||
def _get_my_resources(self, ctxt, ovo_class_list):
|
||||
def _get_my_volumes_summary(self, ctxt):
|
||||
filters = self._get_cluster_or_host_filters()
|
||||
return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters)
|
||||
return objects.VolumeList.get_volume_summary(ctxt, False, filters)
|
||||
|
||||
def _get_my_volumes(self, ctxt):
|
||||
return self._get_my_resources(ctxt, objects.VolumeList)
|
||||
def _get_my_snapshots_summary(self, ctxt):
|
||||
filters = self._get_cluster_or_host_filters()
|
||||
return objects.SnapshotList.get_snapshot_summary(ctxt, False, filters)
|
||||
|
||||
def _get_my_snapshots(self, ctxt):
|
||||
return self._get_my_resources(ctxt, objects.SnapshotList)
|
||||
def _get_my_resources(self, ctxt, ovo_class_list, limit=None, offset=None):
|
||||
filters = self._get_cluster_or_host_filters()
|
||||
return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters,
|
||||
limit=limit,
|
||||
offset=offset)
|
||||
|
||||
def _get_my_volumes(self, ctxt, limit=None, offset=None):
|
||||
return self._get_my_resources(ctxt, objects.VolumeList,
|
||||
limit, offset)
|
||||
|
||||
def _get_my_snapshots(self, ctxt, limit=None, offset=None):
|
||||
return self._get_my_resources(ctxt, objects.SnapshotList,
|
||||
limit, offset)
|
||||
|
||||
def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys,
|
||||
sort_dirs, want_objects=False):
|
||||
|
72
cinder/volume/volume_migration.py
Normal file
72
cinder/volume/volume_migration.py
Normal file
@ -0,0 +1,72 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cinder import db
|
||||
from cinder import objects
|
||||
|
||||
|
||||
class VolumeMigration(object):
|
||||
"""Lightweight Volume Migration object.
|
||||
|
||||
Will be used by KeyMigrator instead of regular Volume object to avoid
|
||||
extra memory usage.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def from_volume(volume, context):
|
||||
volume_migration = VolumeMigration(volume.id,
|
||||
volume.user_id,
|
||||
volume.encryption_key_id)
|
||||
volume_migration._context = context
|
||||
return volume_migration
|
||||
|
||||
def __init__(self, id, user_id, encryption_key_id):
|
||||
self.id = id
|
||||
self.user_id = user_id
|
||||
self.orig_encryption_key_id = encryption_key_id
|
||||
self.encryption_key_id = encryption_key_id
|
||||
|
||||
def _get_updates(self):
|
||||
updates = {}
|
||||
if self.orig_encryption_key_id != self.encryption_key_id:
|
||||
updates['encryption_key_id'] = self.encryption_key_id
|
||||
return updates
|
||||
|
||||
def _reset_changes(self):
|
||||
self.orig_encryption_key_id = self.encryption_key_id
|
||||
|
||||
def save(self):
|
||||
updates = self._get_updates()
|
||||
if updates:
|
||||
db.volume_update(self._context, self.id, updates)
|
||||
self._reset_changes()
|
||||
|
||||
def __str__(self):
|
||||
return 'id = {}'.format(self.id)
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
|
||||
class VolumeMigrationList(list):
|
||||
|
||||
def __init__(self):
|
||||
list.__init__(self)
|
||||
|
||||
def append(self, volumes, context):
|
||||
|
||||
if not isinstance(volumes, objects.volume.VolumeList):
|
||||
return
|
||||
|
||||
for volume in volumes:
|
||||
volume_migration = VolumeMigration.from_volume(volume, context)
|
||||
super(VolumeMigrationList, self).append(volume_migration)
|
@ -0,0 +1,9 @@
|
||||
---
|
||||
upgrade:
|
||||
- Volume Manager now uses the configuration option
|
||||
``init_host_max_objects`` retrieval to set max number of
|
||||
volumes and snapshots to be retrieved per batch during
|
||||
volume manager host initialization. Query results will
|
||||
be obtained in batches from the database and not in one
|
||||
shot to avoid extreme memory usage.
|
||||
Default value is 0 and disables this functionality.
|
Loading…
Reference in New Issue
Block a user