diff --git a/glance/common/wsgi.py b/glance/common/wsgi.py index 448efd8bcf..5fe3a39efc 100644 --- a/glance/common/wsgi.py +++ b/glance/common/wsgi.py @@ -54,6 +54,8 @@ from glance.common import config from glance.common import exception from glance.common import store_utils from glance.common import utils +import glance.db +from glance import housekeeping from glance import i18n from glance.i18n import _, _LE, _LI, _LW @@ -501,14 +503,18 @@ class BaseServer(object): self.default_port = default_port self.configure() self.start_wsgi() + + cleaner = housekeeping.StagingStoreCleaner(glance.db.get_api()) + self.pool.spawn_n(cleaner.clean_orphaned_staging_residue) + if self.initialize_prefetcher: self.cache_images() def start_wsgi(self): workers = get_num_workers() + self.pool = self.create_pool() if workers == 0: # Useful for profiling, test, debug etc. - self.pool = self.create_pool() self.pool.spawn_n(self._single_run, self.application, self.sock) return else: diff --git a/glance/common/wsgi_app.py b/glance/common/wsgi_app.py index d91952e501..3f35dcc30b 100644 --- a/glance/common/wsgi_app.py +++ b/glance/common/wsgi_app.py @@ -12,6 +12,7 @@ import atexit import os +import threading import glance_store from oslo_config import cfg @@ -22,6 +23,7 @@ from glance.api import common import glance.async_ from glance.common import config from glance.common import store_utils +from glance import housekeeping from glance.i18n import _ from glance import notifier @@ -79,6 +81,17 @@ def drain_threadpools(): pool_model.pool.shutdown() +def run_staging_cleanup(): + cleaner = housekeeping.StagingStoreCleaner(glance.db.get_api()) + # NOTE(danms): Start thread as a daemon. It is still a + # single-shot, but this will not block our shutdown if it is + # running. + cleanup_thread = threading.Thread( + target=cleaner.clean_orphaned_staging_residue, + daemon=True) + cleanup_thread.start() + + def init_app(): config.set_config_defaults() config_files = _get_config_files() @@ -111,5 +124,7 @@ def init_app(): glance_store.create_stores(CONF) glance_store.verify_default_store() + run_staging_cleanup() + _setup_os_profiler() return config.load_paste_app('glance-api') diff --git a/glance/housekeeping.py b/glance/housekeeping.py new file mode 100644 index 0000000000..19efb0f299 --- /dev/null +++ b/glance/housekeeping.py @@ -0,0 +1,126 @@ +# Copyright 2021 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import uuidutils + +from glance.common import exception +from glance.common import store_utils +from glance import context +from glance.i18n import _LE + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +def staging_store_path(): + """Return the local path to the staging store. + + :raises: GlanceException if staging store is not configured to be + a file:// URI + """ + if CONF.enabled_backends: + separator, staging_dir = store_utils.get_dir_separator() + else: + staging_dir = CONF.node_staging_uri + expected_prefix = 'file://' + if not staging_dir.startswith(expected_prefix): + raise exception.GlanceException( + 'Unexpected scheme in staging store; ' + 'unable to scan for residue') + return staging_dir[len(expected_prefix):] + + +class StagingStoreCleaner: + def __init__(self, db): + self.db = db + self.context = context.get_admin_context() + + @staticmethod + def get_image_id(filename): + if '.' in filename: + filename, ext = filename.split('.', 1) + if uuidutils.is_uuid_like(filename): + return filename + + def is_valid_image(self, image_id): + try: + image = self.db.image_get(self.context, image_id) + # FIXME(danms): Maybe check that it's not deleted or + # something else like state, size, etc + return not image['deleted'] + except exception.ImageNotFound: + return False + + @staticmethod + def delete_file(path): + try: + os.remove(path) + except FileNotFoundError: + # NOTE(danms): We must have raced with something else, so this + # is not a problem + pass + except Exception as e: + LOG.error(_LE('Failed to delete stale staging ' + 'path %(path)r: %(err)s'), + {'path': path, 'err': str(e)}) + return False + return True + + def clean_orphaned_staging_residue(self): + try: + files = os.listdir(staging_store_path()) + except FileNotFoundError: + # NOTE(danms): If we cannot list the staging dir, there is + # clearly nothing left from a previous run, so nothing to + # clean up. + files = [] + if not files: + return + + LOG.debug('Found %i files in staging directory for potential cleanup', + len(files)) + cleaned = ignored = error = 0 + for filename in files: + image_id = self.get_image_id(filename) + if not image_id: + # NOTE(danms): We should probably either have a config option + # that decides what to do here (i.e. reap or ignore), or decide + # that this is not okay and just nuke anything we find. + LOG.debug('Staging directory contains unexpected non-image ' + 'file %r; ignoring', + filename) + ignored += 1 + continue + if self.is_valid_image(image_id): + # NOTE(danms): We found a non-deleted image for this + # file, so leave it in place. + ignored += 1 + continue + path = os.path.join(staging_store_path(), filename) + LOG.debug('Stale staging residue found for image ' + '%(uuid)s: %(file)r; deleting now.', + {'uuid': image_id, 'file': path}) + if self.delete_file(path): + cleaned += 1 + else: + error += 1 + + LOG.debug('Cleaned %(cleaned)i stale staging files, ' + '%(ignored)i ignored (%(error)i errors)', + {'cleaned': cleaned, 'ignored': ignored, 'error': error}) diff --git a/glance/tests/functional/test_wsgi.py b/glance/tests/functional/test_wsgi.py index 859b6130a8..0c7ec8e527 100644 --- a/glance/tests/functional/test_wsgi.py +++ b/glance/tests/functional/test_wsgi.py @@ -16,9 +16,14 @@ """Tests for `glance.wsgi`.""" import os +from six.moves import http_client as http import socket import time +from oslo_serialization import jsonutils +from oslo_utils.fixture import uuidsentinel as uuids +import requests + from glance.common import wsgi from glance.tests import functional @@ -57,3 +62,116 @@ class TestWSGIServer(functional.FunctionalTest): self.assertIn(greetings, get_request()) # Should fail - connection timed out so we get nothing from the server self.assertFalse(get_request(delay=1.1)) + + +class StagingCleanupBase: + def _url(self, path): + return 'http://127.0.0.1:%d%s' % (self.api_port, path) + + def _headers(self, custom_headers=None): + base_headers = { + 'X-Identity-Status': 'Confirmed', + 'X-Auth-Token': '932c5c84-02ac-4fe5-a9ba-620af0e2bb96', + 'X-User-Id': 'f9a41d13-0c13-47e9-bee2-ce4e8bfe958e', + 'X-Tenant-Id': uuids.tenant1, + 'X-Roles': 'member', + } + base_headers.update(custom_headers or {}) + return base_headers + + def test_clean_on_start(self): + staging = os.path.join(self.test_dir, 'staging') + + # Start the server + self.start_servers(**self.__dict__.copy()) + + # Create an image + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json'}) + data = jsonutils.dumps({'name': 'image-1', 'type': 'kernel', + 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(http.CREATED, response.status_code) + image = jsonutils.loads(response.text) + image_id = image['id'] + + # Stage data for the image + path = self._url('/v2/images/%s/stage' % image_id) + headers = self._headers({'Content-Type': 'application/octet-stream'}) + image_data = b'ZZZZZ' + response = requests.put(path, headers=headers, data=image_data) + self.assertEqual(http.NO_CONTENT, response.status_code) + + # Stop the server + self.my_api_server.stop() + + # Create more files in staging, one unrecognized one, and one + # uuid that matches nothing in the database, and some residue + # like we would see from failed conversions and decompressions + # for the image we created above. + open(os.path.join(staging, 'foo'), 'w') + open(os.path.join(staging, uuids.stale), 'w') + open(os.path.join(staging, uuids.converting), 'w') + converting_fn = os.path.join(staging, '%s.qcow2' % uuids.converting) + decompressing_fn = os.path.join(staging, '%s.uc' % uuids.decompressing) + open(converting_fn, 'w') + open(decompressing_fn, 'w') + + # Restart the server. We set "needs_database" to False here to avoid + # recreating the database during startup, thus causing the server to + # think there are no valid images and deleting everything. + self.my_api_server.needs_database = False + self.start_with_retry(self.my_api_server, + 'api_port', 3, **self.__dict__.copy()) + + # Poll to give it time to come up and do the work. Use the presence + # of the extra files to determine if the cleanup has run yet. + for i in range(0, 10): + try: + requests.get(self._url('/v2/images')) + except Exception: + # Not even answering queries yet + pass + else: + files = os.listdir(staging) + if len(files) == 2: + break + + time.sleep(1) + + # We should still find the not-an-image file... + self.assertTrue(os.path.exists(os.path.join(staging, 'foo'))) + # ...and make sure the actually-staged image file is still present.... + self.assertTrue(os.path.exists(os.path.join(staging, image_id))) + # ... but the stale image should be gone, + self.assertFalse(os.path.exists(os.path.join(staging, + uuids.stale))) + # ... along with the residue of the conversion ... + self.assertFalse(os.path.exists(converting_fn)) + # ... and the residue of the decompression. + self.assertFalse(os.path.exists(decompressing_fn)) + + self.stop_servers() + + +class TestStagingCleanupMultistore(functional.MultipleBackendFunctionalTest, + StagingCleanupBase): + """Test for staging store cleanup on API server startup. + + This tests the multistore case. + """ + def setUp(self): + super(TestStagingCleanupMultistore, self).setUp() + self.my_api_server = self.api_server_multiple_backend + + +class TestStagingCleanupSingleStore(functional.FunctionalTest, + StagingCleanupBase): + """Test for staging store cleanup on API server startup. + + This tests the single store case. + """ + def setUp(self): + super(TestStagingCleanupSingleStore, self).setUp() + self.my_api_server = self.api_server diff --git a/glance/tests/unit/common/test_wsgi_app.py b/glance/tests/unit/common/test_wsgi_app.py index f9e0a7687b..790562b51e 100644 --- a/glance/tests/unit/common/test_wsgi_app.py +++ b/glance/tests/unit/common/test_wsgi_app.py @@ -63,3 +63,17 @@ class TestWsgiAppInit(test_utils.BaseTestCase): # Make sure that shutdown() was called on the tasks_pool # ThreadPoolExecutor mock_shutdown.assert_called_once_with() + + @mock.patch('glance.async_._THREADPOOL_MODEL', new=None) + @mock.patch('glance.common.config.load_paste_app') + @mock.patch('glance.common.wsgi_app._get_config_files') + @mock.patch('threading.Thread') + @mock.patch('glance.housekeeping.StagingStoreCleaner') + def test_runs_staging_cleanup(self, mock_cleaner, mock_Thread, mock_conf, + mock_load): + mock_conf.return_value = [] + wsgi_app.init_app() + mock_Thread.assert_called_once_with( + target=mock_cleaner().clean_orphaned_staging_residue, + daemon=True) + mock_Thread.return_value.start.assert_called_once_with() diff --git a/glance/tests/unit/test_housekeeping.py b/glance/tests/unit/test_housekeeping.py new file mode 100644 index 0000000000..836481566a --- /dev/null +++ b/glance/tests/unit/test_housekeeping.py @@ -0,0 +1,235 @@ +# Copyright 2021 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +from unittest import mock + +import glance_store +from oslo_config import cfg +from oslo_utils.fixture import uuidsentinel as uuids + +from glance.common import exception +from glance import context +from glance import housekeeping +import glance.tests.unit.utils as unit_test_utils +import glance.tests.utils as test_utils + +CONF = cfg.CONF + + +class TestStagingStoreHousekeeping(test_utils.BaseTestCase): + def _store_dir(self, store): + return os.path.join(self.test_dir, store) + + def setUp(self): + super(TestStagingStoreHousekeeping, self).setUp() + + self.config(enabled_backends={'store1': 'file'}) + glance_store.register_store_opts( + CONF, + reserved_stores={'os_glance_staging_store': 'file'}) + + self.config(default_backend='store1', + group='glance_store') + self.config(filesystem_store_datadir=self._store_dir('store1'), + group='store1') + self.config(filesystem_store_datadir=self._store_dir('staging'), + group='os_glance_staging_store') + + glance_store.create_multi_stores( + CONF, + reserved_stores={'os_glance_staging_store': 'file'}) + + self.db = unit_test_utils.FakeDB(initialize=False) + self.cleaner = housekeeping.StagingStoreCleaner(self.db) + self.context = context.get_admin_context() + + def test_get_staging_path(self): + expected = os.path.join(self.test_dir, 'staging') + self.assertEqual(expected, housekeeping.staging_store_path()) + + def test_get_staging_path_single_store(self): + self.config(enabled_backends={}) + expected = '/tmp/staging/' + self.assertEqual(expected, housekeeping.staging_store_path()) + + @mock.patch('glance.common.store_utils.get_dir_separator') + def test_assert_staging_scheme(self, mock_get_dir_separator): + # NOTE(danms): This cannot happen now, but since we need to be + # opinionated about the fact that the URL is a file path, better + # to check for it, in case it changes in the future. + mock_get_dir_separator.return_value = ('/', 'http://foo') + self.assertRaises(exception.GlanceException, + lambda: housekeeping.staging_store_path()) + + def test_assert_staging_scheme_on_init(self): + # NOTE(danms): Make this a single-store scenario, which will cover + # our assertion about node_staging_uri while we test for the + # assert-on-init behavior. + self.config(enabled_backends={}, + node_staging_uri='http://good.luck') + self.assertRaises(exception.GlanceException, + housekeeping.staging_store_path) + + def test_get_image_id(self): + self.assertEqual(uuids.some_random_uuid, + self.cleaner.get_image_id(uuids.some_random_uuid)) + self.assertEqual(uuids.some_random_uuid, + self.cleaner.get_image_id( + '%s.qcow2' % uuids.some_random_uuid)) + self.assertEqual(uuids.some_random_uuid, + self.cleaner.get_image_id( + '%s.uc' % uuids.some_random_uuid)) + self.assertEqual(uuids.some_random_uuid, + self.cleaner.get_image_id( + '%s.blah' % uuids.some_random_uuid)) + + self.assertIsNone(self.cleaner.get_image_id('foo')) + self.assertIsNone(self.cleaner.get_image_id('foo.bar')) + + def test_is_valid_image(self): + image = self.db.image_create(self.context, {'status': 'queued'}) + self.assertTrue(self.cleaner.is_valid_image(image['id'])) + self.assertFalse(self.cleaner.is_valid_image('foo')) + + def test_is_valid_image_deleted(self): + image = self.db.image_create(self.context, {'status': 'queued'}) + self.db.image_destroy(self.context, image['id']) + self.assertFalse(self.cleaner.is_valid_image(image['id'])) + + @mock.patch('os.remove') + def test_delete_file(self, mock_remove): + self.assertTrue(self.cleaner.delete_file('foo')) + os.remove.assert_called_once_with('foo') + + @mock.patch('os.remove') + @mock.patch.object(housekeeping, 'LOG') + def test_delete_file_not_found(self, mock_LOG, mock_remove): + os.remove.side_effect = FileNotFoundError('foo is gone') + # We should ignore a file-not-found error + self.assertTrue(self.cleaner.delete_file('foo')) + os.remove.assert_called_once_with('foo') + mock_LOG.error.assert_not_called() + + @mock.patch('os.remove') + @mock.patch.object(housekeeping, 'LOG') + def test_delete_file_failed(self, mock_LOG, mock_remove): + # Any other error should report failure and log + os.remove.side_effect = Exception('insufficient plutonium') + self.assertFalse(self.cleaner.delete_file('foo')) + os.remove.assert_called_once_with('foo') + mock_LOG.error.assert_called_once_with( + 'Failed to delete stale staging path %(path)r: %(err)s', + {'path': 'foo', 'err': 'insufficient plutonium'}) + + @mock.patch('os.listdir') + @mock.patch('os.remove') + @mock.patch.object(housekeeping, 'LOG') + def test_clean_orphaned_staging_residue_empty(self, mock_LOG, mock_remove, + mock_listdir): + mock_listdir.return_value = [] + self.cleaner.clean_orphaned_staging_residue() + mock_listdir.assert_called_once_with(housekeeping.staging_store_path()) + mock_remove.assert_not_called() + mock_LOG.assert_not_called() + + @mock.patch('os.remove') + @mock.patch('os.listdir') + @mock.patch.object(housekeeping, 'LOG') + def test_clean_orphaned_staging_residue(self, mock_LOG, mock_listdir, + mock_remove): + staging = housekeeping.staging_store_path() + + image = self.db.image_create(self.context, {'status': 'queued'}) + + mock_listdir.return_value = ['notanimageid', image['id'], uuids.stale, + uuids.midconvert, + '%s.qcow2' % uuids.midconvert] + self.cleaner.clean_orphaned_staging_residue() + + # NOTE(danms): We should have deleted the stale image file + expected_stale = os.path.join(staging, uuids.stale) + + # NOTE(danms): We should have deleted the mid-convert base image and + # the target file + expected_mc = os.path.join(staging, uuids.midconvert) + expected_mc_target = os.path.join(staging, + '%s.qcow2' % uuids.midconvert) + + mock_remove.assert_has_calls([ + mock.call(expected_stale), + mock.call(expected_mc), + mock.call(expected_mc_target), + ]) + + # NOTE(danms): We should have cleaned the one (which we os.remove()'d) + # above, and ignore the invalid and active ones. No errors this time. + mock_LOG.debug.assert_has_calls([ + mock.call('Found %i files in staging directory for potential ' + 'cleanup', 5), + mock.call('Staging directory contains unexpected non-image file ' + '%r; ignoring', + 'notanimageid'), + mock.call('Stale staging residue found for image %(uuid)s: ' + '%(file)r; deleting now.', + {'uuid': uuids.stale, 'file': expected_stale}), + mock.call('Stale staging residue found for image %(uuid)s: ' + '%(file)r; deleting now.', + {'uuid': uuids.midconvert, 'file': expected_mc}), + mock.call('Stale staging residue found for image %(uuid)s: ' + '%(file)r; deleting now.', + {'uuid': uuids.midconvert, 'file': expected_mc_target}), + mock.call('Cleaned %(cleaned)i stale staging files, ' + '%(ignored)i ignored (%(error)i errors)', + {'cleaned': 3, 'ignored': 2, 'error': 0}), + ]) + + @mock.patch('os.listdir') + @mock.patch('os.remove') + @mock.patch.object(housekeeping, 'LOG') + def test_clean_orphaned_staging_residue_handles_errors(self, mock_LOG, + mock_remove, + mock_listdir): + staging = housekeeping.staging_store_path() + + mock_listdir.return_value = [uuids.gone, uuids.error] + mock_remove.side_effect = [FileNotFoundError('gone'), + PermissionError('not yours')] + self.cleaner.clean_orphaned_staging_residue() + + # NOTE(danms): We should only have logged an error for the + # permission failure + mock_LOG.error.assert_called_once_with( + 'Failed to delete stale staging path %(path)r: %(err)s', + {'path': os.path.join(staging, uuids.error), + 'err': 'not yours'}) + + # NOTE(danms): We should report the permission failure as an error, + # but not the already-gone or invalid ones. + mock_LOG.debug.assert_has_calls([ + mock.call('Found %i files in staging directory for potential ' + 'cleanup', 2), + mock.call('Stale staging residue found for image %(uuid)s: ' + '%(file)r; deleting now.', + {'uuid': uuids.gone, + 'file': os.path.join(staging, uuids.gone)}), + mock.call('Stale staging residue found for image %(uuid)s: ' + '%(file)r; deleting now.', + {'uuid': uuids.error, + 'file': os.path.join(staging, uuids.error)}), + mock.call('Cleaned %(cleaned)i stale staging files, ' + '%(ignored)i ignored (%(error)i errors)', + {'cleaned': 1, 'ignored': 0, 'error': 1}), + ])