Merge "Initiating Cinder Volume Manager with large number of volumes"
This commit is contained in:
commit
211df9a0f9
@ -320,9 +320,9 @@ def volume_get_all_by_project(context, project_id, marker, limit,
|
||||
offset=offset)
|
||||
|
||||
|
||||
def get_volume_summary(context, project_only):
|
||||
def get_volume_summary(context, project_only, filters=None):
|
||||
"""Get volume summary."""
|
||||
return IMPL.get_volume_summary(context, project_only)
|
||||
return IMPL.get_volume_summary(context, project_only, filters)
|
||||
|
||||
|
||||
def volume_update(context, volume_id, values):
|
||||
@ -528,6 +528,11 @@ def snapshot_get_all_active_by_window(context, begin, end=None,
|
||||
project_id)
|
||||
|
||||
|
||||
def get_snapshot_summary(context, project_only, filters=None):
|
||||
"""Get snapshot summary."""
|
||||
return IMPL.get_snapshot_summary(context, project_only, filters)
|
||||
|
||||
|
||||
####################
|
||||
|
||||
|
||||
|
@ -2222,11 +2222,15 @@ def volume_get_all(context, marker=None, limit=None, sort_keys=None,
|
||||
|
||||
|
||||
@require_context
|
||||
def get_volume_summary(context, project_only):
|
||||
def get_volume_summary(context, project_only, filters=None):
|
||||
"""Retrieves all volumes summary.
|
||||
|
||||
:param context: context to query under
|
||||
:param project_only: limit summary to project volumes
|
||||
:param filters: dictionary of filters; values that are in lists, tuples,
|
||||
or sets cause an 'IN' operation, while exact matching
|
||||
is used for other values, see _process_volume_filters
|
||||
function for more information
|
||||
:returns: volume summary
|
||||
"""
|
||||
if not (project_only or is_admin_context(context)):
|
||||
@ -2236,6 +2240,9 @@ def get_volume_summary(context, project_only):
|
||||
if project_only:
|
||||
query = query.filter_by(project_id=context.project_id)
|
||||
|
||||
if filters:
|
||||
query = _process_volume_filters(query, filters)
|
||||
|
||||
if query is None:
|
||||
return []
|
||||
|
||||
@ -3357,6 +3364,40 @@ def snapshot_update(context, snapshot_id, values):
|
||||
raise exception.SnapshotNotFound(snapshot_id=snapshot_id)
|
||||
|
||||
|
||||
@require_context
|
||||
def get_snapshot_summary(context, project_only, filters=None):
|
||||
"""Retrieves all snapshots summary.
|
||||
|
||||
:param context: context to query under
|
||||
:param project_only: limit summary to snapshots
|
||||
:param filters: dictionary of filters; values that are in lists, tuples,
|
||||
or sets cause an 'IN' operation, while exact matching
|
||||
is used for other values, see _process_snaps_filters
|
||||
function for more information
|
||||
:returns: snapshots summary
|
||||
"""
|
||||
|
||||
if not (project_only or is_admin_context(context)):
|
||||
raise exception.AdminRequired()
|
||||
|
||||
query = model_query(context, func.count(models.Snapshot.id),
|
||||
func.sum(models.Snapshot.volume_size),
|
||||
read_deleted="no")
|
||||
|
||||
if project_only:
|
||||
query = query.filter_by(project_id=context.project_id)
|
||||
|
||||
if filters:
|
||||
query = _process_snaps_filters(query, filters)
|
||||
|
||||
if query is None:
|
||||
return []
|
||||
|
||||
result = query.first()
|
||||
|
||||
return result[0] or 0, result[1] or 0
|
||||
|
||||
|
||||
####################
|
||||
|
||||
|
||||
|
@ -27,6 +27,7 @@ from keystoneauth1 import session as ks_session
|
||||
from cinder import context
|
||||
from cinder import coordination
|
||||
from cinder import objects
|
||||
from cinder.volume import volume_migration as volume_migration
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -145,7 +146,8 @@ class KeyMigrator(object):
|
||||
item.encryption_key_id = encryption_key_id
|
||||
item.save()
|
||||
|
||||
if isinstance(item, objects.volume.Volume):
|
||||
allowTypes = (volume_migration.VolumeMigration, objects.volume.Volume)
|
||||
if isinstance(item, allowTypes):
|
||||
snapshots = objects.snapshot.SnapshotList.get_all_for_volume(
|
||||
self.admin_context,
|
||||
item.id)
|
||||
|
@ -360,3 +360,8 @@ class SnapshotList(base.ObjectListBase, base.CinderObject):
|
||||
expected_attrs = Snapshot._get_expected_attrs(context)
|
||||
return base.obj_make_list(context, cls(context), objects.Snapshot,
|
||||
snapshots, expected_attrs=expected_attrs)
|
||||
|
||||
@classmethod
|
||||
def get_snapshot_summary(cls, context, project_only, filters=None):
|
||||
summary = db.get_snapshot_summary(context, project_only, filters)
|
||||
return summary
|
||||
|
@ -653,8 +653,8 @@ class VolumeList(base.ObjectListBase, base.CinderObject):
|
||||
volumes, expected_attrs=expected_attrs)
|
||||
|
||||
@classmethod
|
||||
def get_volume_summary(cls, context, project_only):
|
||||
volumes = db.get_volume_summary(context, project_only)
|
||||
def get_volume_summary(cls, context, project_only, filters=None):
|
||||
volumes = db.get_volume_summary(context, project_only, filters)
|
||||
return volumes
|
||||
|
||||
@classmethod
|
||||
|
@ -26,6 +26,7 @@ from cinder.tests.unit import utils as tests_utils
|
||||
from cinder.tests.unit import volume as base
|
||||
from cinder.volume import driver
|
||||
from cinder.volume import utils as volutils
|
||||
from cinder.volume import volume_migration as volume_migration
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -81,6 +82,14 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase):
|
||||
self.volume.delete_volume(self.context, vol3)
|
||||
self.volume.delete_volume(self.context, vol4)
|
||||
|
||||
def test_init_host_count_allocated_capacity_batch_retrieval(self):
|
||||
old_val = CONF.init_host_max_objects_retrieval
|
||||
CONF.init_host_max_objects_retrieval = 1
|
||||
try:
|
||||
self.test_init_host_count_allocated_capacity()
|
||||
finally:
|
||||
CONF.init_host_max_objects_retrieval = old_val
|
||||
|
||||
@mock.patch('cinder.manager.CleanableManager.init_host')
|
||||
def test_init_host_count_allocated_capacity_cluster(self, init_host_mock):
|
||||
cluster_name = 'mycluster'
|
||||
@ -161,6 +170,14 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase):
|
||||
self.volume.delete_volume(self.context, vol0)
|
||||
self.volume.delete_volume(self.context, vol1)
|
||||
|
||||
def test_init_host_sync_provider_info_batch_retrieval(self):
|
||||
old_val = CONF.init_host_max_objects_retrieval
|
||||
CONF.init_host_max_objects_retrieval = 1
|
||||
try:
|
||||
self.test_init_host_sync_provider_info()
|
||||
finally:
|
||||
CONF.init_host_max_objects_retrieval = old_val
|
||||
|
||||
@mock.patch.object(driver.BaseVD, "update_provider_info")
|
||||
def test_init_host_sync_provider_info_no_update(self, mock_update):
|
||||
vol0 = tests_utils.create_volume(
|
||||
@ -267,9 +284,11 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase):
|
||||
group_include_mock.assert_called_once_with(mock.ANY, cluster,
|
||||
host=self.volume.host)
|
||||
vol_get_all_mock.assert_called_once_with(
|
||||
mock.ANY, filters={'cluster_name': cluster})
|
||||
mock.ANY, filters={'cluster_name': cluster},
|
||||
limit=None, offset=None)
|
||||
snap_get_all_mock.assert_called_once_with(
|
||||
mock.ANY, filters={'cluster_name': cluster})
|
||||
mock.ANY, filters={'cluster_name': cluster},
|
||||
limit=None, offset=None)
|
||||
|
||||
@mock.patch('cinder.keymgr.migration.migrate_fixed_key')
|
||||
@mock.patch('cinder.volume.manager.VolumeManager._get_my_volumes')
|
||||
@ -280,9 +299,13 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase):
|
||||
mock_migrate_fixed_key):
|
||||
|
||||
self.volume.init_host(service_id=self.service_id)
|
||||
|
||||
volumes = mock_get_my_volumes()
|
||||
volumes_to_migrate = volume_migration.VolumeMigrationList()
|
||||
volumes_to_migrate.append(volumes, self.context)
|
||||
mock_add_threadpool.assert_called_once_with(
|
||||
mock_migrate_fixed_key,
|
||||
volumes=mock_get_my_volumes())
|
||||
volumes=volumes_to_migrate)
|
||||
|
||||
@mock.patch('time.sleep')
|
||||
def test_init_host_retry(self, mock_sleep):
|
||||
|
@ -72,7 +72,8 @@ class ReplicationTestCase(base.BaseVolumeTestCase):
|
||||
|
||||
self.manager.failover_host(self.context, new_backend)
|
||||
mock_getall.assert_called_once_with(self.context,
|
||||
filters={'host': self.host})
|
||||
filters={'host': self.host},
|
||||
limit=None, offset=None)
|
||||
mock_failover.assert_called_once_with(self.context,
|
||||
[],
|
||||
secondary_id=new_backend,
|
||||
|
@ -83,6 +83,7 @@ from cinder.volume.flows.manager import manage_existing_snapshot
|
||||
from cinder.volume import group_types
|
||||
from cinder.volume import rpcapi as volume_rpcapi
|
||||
from cinder.volume import utils as vol_utils
|
||||
from cinder.volume import volume_migration as volume_migration
|
||||
from cinder.volume import volume_types
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -129,6 +130,13 @@ volume_manager_opts = [
|
||||
help='Maximum times to reintialize the driver '
|
||||
'if volume initialization fails. The interval of retry is '
|
||||
'exponentially backoff, and will be 1s, 2s, 4s etc.'),
|
||||
cfg.IntOpt('init_host_max_objects_retrieval',
|
||||
default=0,
|
||||
help='Max number of volumes and snapshots to be retrieved '
|
||||
'per batch during volume manager host initialization. '
|
||||
'Query results will be obtained in batches from the '
|
||||
'database and not in one shot to avoid extreme memory '
|
||||
'usage. Set 0 to turn off this functionality.'),
|
||||
]
|
||||
|
||||
volume_backend_opts = [
|
||||
@ -462,36 +470,83 @@ class VolumeManager(manager.CleanableManager,
|
||||
# Initialize backend capabilities list
|
||||
self.driver.init_capabilities()
|
||||
|
||||
volumes = self._get_my_volumes(ctxt)
|
||||
snapshots = self._get_my_snapshots(ctxt)
|
||||
self._sync_provider_info(ctxt, volumes, snapshots)
|
||||
# FIXME volume count for exporting is wrong
|
||||
|
||||
# Zero stats
|
||||
self.stats['pools'] = {}
|
||||
self.stats.update({'allocated_capacity_gb': 0})
|
||||
|
||||
try:
|
||||
for volume in volumes:
|
||||
# available volume should also be counted into allocated
|
||||
if volume['status'] in ['in-use', 'available']:
|
||||
# calculate allocated capacity for driver
|
||||
self._count_allocated_capacity(ctxt, volume)
|
||||
# Batch retrieval volumes and snapshots
|
||||
|
||||
try:
|
||||
if volume['status'] in ['in-use']:
|
||||
self.driver.ensure_export(ctxt, volume)
|
||||
except Exception:
|
||||
LOG.exception("Failed to re-export volume, "
|
||||
"setting to ERROR.",
|
||||
resource=volume)
|
||||
volume.conditional_update({'status': 'error'},
|
||||
{'status': 'in-use'})
|
||||
# All other cleanups are processed by parent class CleanableManager
|
||||
num_vols, num_snaps, max_objs_num, req_range = None, None, None, [0]
|
||||
req_limit = CONF.init_host_max_objects_retrieval
|
||||
use_batch_objects_retrieval = req_limit > 0
|
||||
|
||||
except Exception:
|
||||
LOG.exception("Error during re-export on driver init.",
|
||||
resource=volume)
|
||||
return
|
||||
if use_batch_objects_retrieval:
|
||||
# Get total number of volumes
|
||||
num_vols, __, __ = self._get_my_volumes_summary(ctxt)
|
||||
# Get total number of snapshots
|
||||
num_snaps, __ = self._get_my_snapshots_summary(ctxt)
|
||||
# Calculate highest number of the objects (volumes or snapshots)
|
||||
max_objs_num = max(num_vols, num_snaps)
|
||||
# Make batch request loop counter
|
||||
req_range = range(0, max_objs_num, req_limit)
|
||||
|
||||
volumes_to_migrate = volume_migration.VolumeMigrationList()
|
||||
|
||||
for req_offset in req_range:
|
||||
|
||||
# Retrieve 'req_limit' number of objects starting from
|
||||
# 'req_offset' position
|
||||
volumes, snapshots = None, None
|
||||
if use_batch_objects_retrieval:
|
||||
if req_offset < num_vols:
|
||||
volumes = self._get_my_volumes(ctxt,
|
||||
limit=req_limit,
|
||||
offset=req_offset)
|
||||
else:
|
||||
volumes = objects.VolumeList()
|
||||
if req_offset < num_snaps:
|
||||
snapshots = self._get_my_snapshots(ctxt,
|
||||
limit=req_limit,
|
||||
offset=req_offset)
|
||||
else:
|
||||
snapshots = objects.SnapshotList()
|
||||
# or retrieve all volumes and snapshots per single request
|
||||
else:
|
||||
volumes = self._get_my_volumes(ctxt)
|
||||
snapshots = self._get_my_snapshots(ctxt)
|
||||
|
||||
self._sync_provider_info(ctxt, volumes, snapshots)
|
||||
# FIXME volume count for exporting is wrong
|
||||
|
||||
try:
|
||||
for volume in volumes:
|
||||
# available volume should also be counted into allocated
|
||||
if volume['status'] in ['in-use', 'available']:
|
||||
# calculate allocated capacity for driver
|
||||
self._count_allocated_capacity(ctxt, volume)
|
||||
|
||||
try:
|
||||
if volume['status'] in ['in-use']:
|
||||
self.driver.ensure_export(ctxt, volume)
|
||||
except Exception:
|
||||
LOG.exception("Failed to re-export volume, "
|
||||
"setting to ERROR.",
|
||||
resource=volume)
|
||||
volume.conditional_update({'status': 'error'},
|
||||
{'status': 'in-use'})
|
||||
# All other cleanups are processed by parent class -
|
||||
# CleanableManager
|
||||
|
||||
except Exception:
|
||||
LOG.exception("Error during re-export on driver init.",
|
||||
resource=volume)
|
||||
return
|
||||
|
||||
if len(volumes):
|
||||
volumes_to_migrate.append(volumes, ctxt)
|
||||
|
||||
del volumes
|
||||
del snapshots
|
||||
|
||||
self.driver.set_throttle()
|
||||
|
||||
@ -507,7 +562,7 @@ class VolumeManager(manager.CleanableManager,
|
||||
# Migrate any ConfKeyManager keys based on fixed_key to the currently
|
||||
# configured key manager.
|
||||
self._add_to_threadpool(key_migration.migrate_fixed_key,
|
||||
volumes=volumes)
|
||||
volumes=volumes_to_migrate)
|
||||
|
||||
# collect and publish service capabilities
|
||||
self.publish_service_capabilities(ctxt)
|
||||
@ -2923,15 +2978,27 @@ class VolumeManager(manager.CleanableManager,
|
||||
filters = {'host': self.host}
|
||||
return filters
|
||||
|
||||
def _get_my_resources(self, ctxt, ovo_class_list):
|
||||
def _get_my_volumes_summary(self, ctxt):
|
||||
filters = self._get_cluster_or_host_filters()
|
||||
return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters)
|
||||
return objects.VolumeList.get_volume_summary(ctxt, False, filters)
|
||||
|
||||
def _get_my_volumes(self, ctxt):
|
||||
return self._get_my_resources(ctxt, objects.VolumeList)
|
||||
def _get_my_snapshots_summary(self, ctxt):
|
||||
filters = self._get_cluster_or_host_filters()
|
||||
return objects.SnapshotList.get_snapshot_summary(ctxt, False, filters)
|
||||
|
||||
def _get_my_snapshots(self, ctxt):
|
||||
return self._get_my_resources(ctxt, objects.SnapshotList)
|
||||
def _get_my_resources(self, ctxt, ovo_class_list, limit=None, offset=None):
|
||||
filters = self._get_cluster_or_host_filters()
|
||||
return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters,
|
||||
limit=limit,
|
||||
offset=offset)
|
||||
|
||||
def _get_my_volumes(self, ctxt, limit=None, offset=None):
|
||||
return self._get_my_resources(ctxt, objects.VolumeList,
|
||||
limit, offset)
|
||||
|
||||
def _get_my_snapshots(self, ctxt, limit=None, offset=None):
|
||||
return self._get_my_resources(ctxt, objects.SnapshotList,
|
||||
limit, offset)
|
||||
|
||||
def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys,
|
||||
sort_dirs, want_objects=False):
|
||||
|
72
cinder/volume/volume_migration.py
Normal file
72
cinder/volume/volume_migration.py
Normal file
@ -0,0 +1,72 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cinder import db
|
||||
from cinder import objects
|
||||
|
||||
|
||||
class VolumeMigration(object):
|
||||
"""Lightweight Volume Migration object.
|
||||
|
||||
Will be used by KeyMigrator instead of regular Volume object to avoid
|
||||
extra memory usage.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def from_volume(volume, context):
|
||||
volume_migration = VolumeMigration(volume.id,
|
||||
volume.user_id,
|
||||
volume.encryption_key_id)
|
||||
volume_migration._context = context
|
||||
return volume_migration
|
||||
|
||||
def __init__(self, id, user_id, encryption_key_id):
|
||||
self.id = id
|
||||
self.user_id = user_id
|
||||
self.orig_encryption_key_id = encryption_key_id
|
||||
self.encryption_key_id = encryption_key_id
|
||||
|
||||
def _get_updates(self):
|
||||
updates = {}
|
||||
if self.orig_encryption_key_id != self.encryption_key_id:
|
||||
updates['encryption_key_id'] = self.encryption_key_id
|
||||
return updates
|
||||
|
||||
def _reset_changes(self):
|
||||
self.orig_encryption_key_id = self.encryption_key_id
|
||||
|
||||
def save(self):
|
||||
updates = self._get_updates()
|
||||
if updates:
|
||||
db.volume_update(self._context, self.id, updates)
|
||||
self._reset_changes()
|
||||
|
||||
def __str__(self):
|
||||
return 'id = {}'.format(self.id)
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
|
||||
class VolumeMigrationList(list):
|
||||
|
||||
def __init__(self):
|
||||
list.__init__(self)
|
||||
|
||||
def append(self, volumes, context):
|
||||
|
||||
if not isinstance(volumes, objects.volume.VolumeList):
|
||||
return
|
||||
|
||||
for volume in volumes:
|
||||
volume_migration = VolumeMigration.from_volume(volume, context)
|
||||
super(VolumeMigrationList, self).append(volume_migration)
|
@ -0,0 +1,9 @@
|
||||
---
|
||||
upgrade:
|
||||
- Volume Manager now uses the configuration option
|
||||
``init_host_max_objects`` retrieval to set max number of
|
||||
volumes and snapshots to be retrieved per batch during
|
||||
volume manager host initialization. Query results will
|
||||
be obtained in batches from the database and not in one
|
||||
shot to avoid extreme memory usage.
|
||||
Default value is 0 and disables this functionality.
|
Loading…
Reference in New Issue
Block a user