diff --git a/cinder/db/api.py b/cinder/db/api.py index f1d2f93d5db..e60b845e3d7 100644 --- a/cinder/db/api.py +++ b/cinder/db/api.py @@ -320,9 +320,9 @@ def volume_get_all_by_project(context, project_id, marker, limit, offset=offset) -def get_volume_summary(context, project_only): +def get_volume_summary(context, project_only, filters=None): """Get volume summary.""" - return IMPL.get_volume_summary(context, project_only) + return IMPL.get_volume_summary(context, project_only, filters) def volume_update(context, volume_id, values): @@ -528,6 +528,11 @@ def snapshot_get_all_active_by_window(context, begin, end=None, project_id) +def get_snapshot_summary(context, project_only, filters=None): + """Get snapshot summary.""" + return IMPL.get_snapshot_summary(context, project_only, filters) + + #################### diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 91792af6d0e..a787c6a951a 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -2222,11 +2222,15 @@ def volume_get_all(context, marker=None, limit=None, sort_keys=None, @require_context -def get_volume_summary(context, project_only): +def get_volume_summary(context, project_only, filters=None): """Retrieves all volumes summary. :param context: context to query under :param project_only: limit summary to project volumes + :param filters: dictionary of filters; values that are in lists, tuples, + or sets cause an 'IN' operation, while exact matching + is used for other values, see _process_volume_filters + function for more information :returns: volume summary """ if not (project_only or is_admin_context(context)): @@ -2236,6 +2240,9 @@ def get_volume_summary(context, project_only): if project_only: query = query.filter_by(project_id=context.project_id) + if filters: + query = _process_volume_filters(query, filters) + if query is None: return [] @@ -3357,6 +3364,40 @@ def snapshot_update(context, snapshot_id, values): raise exception.SnapshotNotFound(snapshot_id=snapshot_id) +@require_context +def get_snapshot_summary(context, project_only, filters=None): + """Retrieves all snapshots summary. + + :param context: context to query under + :param project_only: limit summary to snapshots + :param filters: dictionary of filters; values that are in lists, tuples, + or sets cause an 'IN' operation, while exact matching + is used for other values, see _process_snaps_filters + function for more information + :returns: snapshots summary + """ + + if not (project_only or is_admin_context(context)): + raise exception.AdminRequired() + + query = model_query(context, func.count(models.Snapshot.id), + func.sum(models.Snapshot.volume_size), + read_deleted="no") + + if project_only: + query = query.filter_by(project_id=context.project_id) + + if filters: + query = _process_snaps_filters(query, filters) + + if query is None: + return [] + + result = query.first() + + return result[0] or 0, result[1] or 0 + + #################### diff --git a/cinder/keymgr/migration.py b/cinder/keymgr/migration.py index e0290dc87bc..7ec2379ef27 100644 --- a/cinder/keymgr/migration.py +++ b/cinder/keymgr/migration.py @@ -27,6 +27,7 @@ from keystoneauth1 import session as ks_session from cinder import context from cinder import coordination from cinder import objects +from cinder.volume import volume_migration as volume_migration LOG = logging.getLogger(__name__) @@ -145,7 +146,8 @@ class KeyMigrator(object): item.encryption_key_id = encryption_key_id item.save() - if isinstance(item, objects.volume.Volume): + allowTypes = (volume_migration.VolumeMigration, objects.volume.Volume) + if isinstance(item, allowTypes): snapshots = objects.snapshot.SnapshotList.get_all_for_volume( self.admin_context, item.id) diff --git a/cinder/objects/snapshot.py b/cinder/objects/snapshot.py index 320156dca78..6669c4614c9 100644 --- a/cinder/objects/snapshot.py +++ b/cinder/objects/snapshot.py @@ -360,3 +360,8 @@ class SnapshotList(base.ObjectListBase, base.CinderObject): expected_attrs = Snapshot._get_expected_attrs(context) return base.obj_make_list(context, cls(context), objects.Snapshot, snapshots, expected_attrs=expected_attrs) + + @classmethod + def get_snapshot_summary(cls, context, project_only, filters=None): + summary = db.get_snapshot_summary(context, project_only, filters) + return summary diff --git a/cinder/objects/volume.py b/cinder/objects/volume.py index b3a1dfce072..7999be36683 100644 --- a/cinder/objects/volume.py +++ b/cinder/objects/volume.py @@ -653,8 +653,8 @@ class VolumeList(base.ObjectListBase, base.CinderObject): volumes, expected_attrs=expected_attrs) @classmethod - def get_volume_summary(cls, context, project_only): - volumes = db.get_volume_summary(context, project_only) + def get_volume_summary(cls, context, project_only, filters=None): + volumes = db.get_volume_summary(context, project_only, filters) return volumes @classmethod diff --git a/cinder/tests/unit/volume/test_init_host.py b/cinder/tests/unit/volume/test_init_host.py index 9a97ebe47c6..0bad4f9db7d 100644 --- a/cinder/tests/unit/volume/test_init_host.py +++ b/cinder/tests/unit/volume/test_init_host.py @@ -26,6 +26,7 @@ from cinder.tests.unit import utils as tests_utils from cinder.tests.unit import volume as base from cinder.volume import driver from cinder.volume import utils as volutils +from cinder.volume import volume_migration as volume_migration CONF = cfg.CONF @@ -81,6 +82,14 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase): self.volume.delete_volume(self.context, vol3) self.volume.delete_volume(self.context, vol4) + def test_init_host_count_allocated_capacity_batch_retrieval(self): + old_val = CONF.init_host_max_objects_retrieval + CONF.init_host_max_objects_retrieval = 1 + try: + self.test_init_host_count_allocated_capacity() + finally: + CONF.init_host_max_objects_retrieval = old_val + @mock.patch('cinder.manager.CleanableManager.init_host') def test_init_host_count_allocated_capacity_cluster(self, init_host_mock): cluster_name = 'mycluster' @@ -161,6 +170,14 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase): self.volume.delete_volume(self.context, vol0) self.volume.delete_volume(self.context, vol1) + def test_init_host_sync_provider_info_batch_retrieval(self): + old_val = CONF.init_host_max_objects_retrieval + CONF.init_host_max_objects_retrieval = 1 + try: + self.test_init_host_sync_provider_info() + finally: + CONF.init_host_max_objects_retrieval = old_val + @mock.patch.object(driver.BaseVD, "update_provider_info") def test_init_host_sync_provider_info_no_update(self, mock_update): vol0 = tests_utils.create_volume( @@ -267,9 +284,11 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase): group_include_mock.assert_called_once_with(mock.ANY, cluster, host=self.volume.host) vol_get_all_mock.assert_called_once_with( - mock.ANY, filters={'cluster_name': cluster}) + mock.ANY, filters={'cluster_name': cluster}, + limit=None, offset=None) snap_get_all_mock.assert_called_once_with( - mock.ANY, filters={'cluster_name': cluster}) + mock.ANY, filters={'cluster_name': cluster}, + limit=None, offset=None) @mock.patch('cinder.keymgr.migration.migrate_fixed_key') @mock.patch('cinder.volume.manager.VolumeManager._get_my_volumes') @@ -280,9 +299,13 @@ class VolumeInitHostTestCase(base.BaseVolumeTestCase): mock_migrate_fixed_key): self.volume.init_host(service_id=self.service_id) + + volumes = mock_get_my_volumes() + volumes_to_migrate = volume_migration.VolumeMigrationList() + volumes_to_migrate.append(volumes, self.context) mock_add_threadpool.assert_called_once_with( mock_migrate_fixed_key, - volumes=mock_get_my_volumes()) + volumes=volumes_to_migrate) @mock.patch('time.sleep') def test_init_host_retry(self, mock_sleep): diff --git a/cinder/tests/unit/volume/test_replication_manager.py b/cinder/tests/unit/volume/test_replication_manager.py index 5e4eb93c381..9b50b7ead8b 100644 --- a/cinder/tests/unit/volume/test_replication_manager.py +++ b/cinder/tests/unit/volume/test_replication_manager.py @@ -72,7 +72,8 @@ class ReplicationTestCase(base.BaseVolumeTestCase): self.manager.failover_host(self.context, new_backend) mock_getall.assert_called_once_with(self.context, - filters={'host': self.host}) + filters={'host': self.host}, + limit=None, offset=None) mock_failover.assert_called_once_with(self.context, [], secondary_id=new_backend, diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 889b54dbdbf..ce768069917 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -83,6 +83,7 @@ from cinder.volume.flows.manager import manage_existing_snapshot from cinder.volume import group_types from cinder.volume import rpcapi as volume_rpcapi from cinder.volume import utils as vol_utils +from cinder.volume import volume_migration as volume_migration from cinder.volume import volume_types LOG = logging.getLogger(__name__) @@ -129,6 +130,13 @@ volume_manager_opts = [ help='Maximum times to reintialize the driver ' 'if volume initialization fails. The interval of retry is ' 'exponentially backoff, and will be 1s, 2s, 4s etc.'), + cfg.IntOpt('init_host_max_objects_retrieval', + default=0, + help='Max number of volumes and snapshots to be retrieved ' + 'per batch during volume manager host initialization. ' + 'Query results will be obtained in batches from the ' + 'database and not in one shot to avoid extreme memory ' + 'usage. Set 0 to turn off this functionality.'), ] volume_backend_opts = [ @@ -462,36 +470,83 @@ class VolumeManager(manager.CleanableManager, # Initialize backend capabilities list self.driver.init_capabilities() - volumes = self._get_my_volumes(ctxt) - snapshots = self._get_my_snapshots(ctxt) - self._sync_provider_info(ctxt, volumes, snapshots) - # FIXME volume count for exporting is wrong - + # Zero stats self.stats['pools'] = {} self.stats.update({'allocated_capacity_gb': 0}) - try: - for volume in volumes: - # available volume should also be counted into allocated - if volume['status'] in ['in-use', 'available']: - # calculate allocated capacity for driver - self._count_allocated_capacity(ctxt, volume) + # Batch retrieval volumes and snapshots - try: - if volume['status'] in ['in-use']: - self.driver.ensure_export(ctxt, volume) - except Exception: - LOG.exception("Failed to re-export volume, " - "setting to ERROR.", - resource=volume) - volume.conditional_update({'status': 'error'}, - {'status': 'in-use'}) - # All other cleanups are processed by parent class CleanableManager + num_vols, num_snaps, max_objs_num, req_range = None, None, None, [0] + req_limit = CONF.init_host_max_objects_retrieval + use_batch_objects_retrieval = req_limit > 0 - except Exception: - LOG.exception("Error during re-export on driver init.", - resource=volume) - return + if use_batch_objects_retrieval: + # Get total number of volumes + num_vols, __, __ = self._get_my_volumes_summary(ctxt) + # Get total number of snapshots + num_snaps, __ = self._get_my_snapshots_summary(ctxt) + # Calculate highest number of the objects (volumes or snapshots) + max_objs_num = max(num_vols, num_snaps) + # Make batch request loop counter + req_range = range(0, max_objs_num, req_limit) + + volumes_to_migrate = volume_migration.VolumeMigrationList() + + for req_offset in req_range: + + # Retrieve 'req_limit' number of objects starting from + # 'req_offset' position + volumes, snapshots = None, None + if use_batch_objects_retrieval: + if req_offset < num_vols: + volumes = self._get_my_volumes(ctxt, + limit=req_limit, + offset=req_offset) + else: + volumes = objects.VolumeList() + if req_offset < num_snaps: + snapshots = self._get_my_snapshots(ctxt, + limit=req_limit, + offset=req_offset) + else: + snapshots = objects.SnapshotList() + # or retrieve all volumes and snapshots per single request + else: + volumes = self._get_my_volumes(ctxt) + snapshots = self._get_my_snapshots(ctxt) + + self._sync_provider_info(ctxt, volumes, snapshots) + # FIXME volume count for exporting is wrong + + try: + for volume in volumes: + # available volume should also be counted into allocated + if volume['status'] in ['in-use', 'available']: + # calculate allocated capacity for driver + self._count_allocated_capacity(ctxt, volume) + + try: + if volume['status'] in ['in-use']: + self.driver.ensure_export(ctxt, volume) + except Exception: + LOG.exception("Failed to re-export volume, " + "setting to ERROR.", + resource=volume) + volume.conditional_update({'status': 'error'}, + {'status': 'in-use'}) + # All other cleanups are processed by parent class - + # CleanableManager + + except Exception: + LOG.exception("Error during re-export on driver init.", + resource=volume) + return + + if len(volumes): + volumes_to_migrate.append(volumes, ctxt) + + del volumes + del snapshots self.driver.set_throttle() @@ -507,7 +562,7 @@ class VolumeManager(manager.CleanableManager, # Migrate any ConfKeyManager keys based on fixed_key to the currently # configured key manager. self._add_to_threadpool(key_migration.migrate_fixed_key, - volumes=volumes) + volumes=volumes_to_migrate) # collect and publish service capabilities self.publish_service_capabilities(ctxt) @@ -2923,15 +2978,27 @@ class VolumeManager(manager.CleanableManager, filters = {'host': self.host} return filters - def _get_my_resources(self, ctxt, ovo_class_list): + def _get_my_volumes_summary(self, ctxt): filters = self._get_cluster_or_host_filters() - return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters) + return objects.VolumeList.get_volume_summary(ctxt, False, filters) - def _get_my_volumes(self, ctxt): - return self._get_my_resources(ctxt, objects.VolumeList) + def _get_my_snapshots_summary(self, ctxt): + filters = self._get_cluster_or_host_filters() + return objects.SnapshotList.get_snapshot_summary(ctxt, False, filters) - def _get_my_snapshots(self, ctxt): - return self._get_my_resources(ctxt, objects.SnapshotList) + def _get_my_resources(self, ctxt, ovo_class_list, limit=None, offset=None): + filters = self._get_cluster_or_host_filters() + return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters, + limit=limit, + offset=offset) + + def _get_my_volumes(self, ctxt, limit=None, offset=None): + return self._get_my_resources(ctxt, objects.VolumeList, + limit, offset) + + def _get_my_snapshots(self, ctxt, limit=None, offset=None): + return self._get_my_resources(ctxt, objects.SnapshotList, + limit, offset) def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys, sort_dirs, want_objects=False): diff --git a/cinder/volume/volume_migration.py b/cinder/volume/volume_migration.py new file mode 100644 index 00000000000..34ddcc311a5 --- /dev/null +++ b/cinder/volume/volume_migration.py @@ -0,0 +1,72 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cinder import db +from cinder import objects + + +class VolumeMigration(object): + """Lightweight Volume Migration object. + + Will be used by KeyMigrator instead of regular Volume object to avoid + extra memory usage. + """ + + @staticmethod + def from_volume(volume, context): + volume_migration = VolumeMigration(volume.id, + volume.user_id, + volume.encryption_key_id) + volume_migration._context = context + return volume_migration + + def __init__(self, id, user_id, encryption_key_id): + self.id = id + self.user_id = user_id + self.orig_encryption_key_id = encryption_key_id + self.encryption_key_id = encryption_key_id + + def _get_updates(self): + updates = {} + if self.orig_encryption_key_id != self.encryption_key_id: + updates['encryption_key_id'] = self.encryption_key_id + return updates + + def _reset_changes(self): + self.orig_encryption_key_id = self.encryption_key_id + + def save(self): + updates = self._get_updates() + if updates: + db.volume_update(self._context, self.id, updates) + self._reset_changes() + + def __str__(self): + return 'id = {}'.format(self.id) + + def __repr__(self): + return self.__str__() + + +class VolumeMigrationList(list): + + def __init__(self): + list.__init__(self) + + def append(self, volumes, context): + + if not isinstance(volumes, objects.volume.VolumeList): + return + + for volume in volumes: + volume_migration = VolumeMigration.from_volume(volume, context) + super(VolumeMigrationList, self).append(volume_migration) diff --git a/releasenotes/notes/volume_init_max_objects_retrieval-966f607c46190946.yaml b/releasenotes/notes/volume_init_max_objects_retrieval-966f607c46190946.yaml new file mode 100644 index 00000000000..29b8bdf18ac --- /dev/null +++ b/releasenotes/notes/volume_init_max_objects_retrieval-966f607c46190946.yaml @@ -0,0 +1,9 @@ +--- +upgrade: + - Volume Manager now uses the configuration option + ``init_host_max_objects`` retrieval to set max number of + volumes and snapshots to be retrieved per batch during + volume manager host initialization. Query results will + be obtained in batches from the database and not in one + shot to avoid extreme memory usage. + Default value is 0 and disables this functionality. \ No newline at end of file