Merge "Refactor scrubber to remove use of eventlet"

This commit is contained in:
Zuul 2025-05-15 12:46:02 +00:00 committed by Gerrit Code Review
commit 9e8bf37c85
6 changed files with 170 additions and 259 deletions

View File

@ -22,9 +22,6 @@ Glance Scrub Service
import os
import sys
import eventlet
eventlet.patcher.monkey_patch()
import subprocess
# If ../glance/__init__.py exists, add ../ to Python search path, so that

View File

@ -14,9 +14,9 @@
# under the License.
import calendar
from concurrent import futures
import time
import eventlet
from glance_store import exceptions as store_exceptions
from oslo_config import cfg
from oslo_log import log as logging
@ -318,25 +318,28 @@ class Daemon(object):
"threads=%(threads)s"),
{'wakeup_time': wakeup_time, 'threads': threads})
self.wakeup_time = wakeup_time
self.event = eventlet.event.Event()
# This pool is used for periodic instantiation of scrubber
self.daemon_pool = eventlet.greenpool.GreenPool(threads)
self.executor = futures.ThreadPoolExecutor(max_workers=threads)
def start(self, application):
self._run(application)
def wait(self):
try:
self.event.wait()
while True:
# Keep the daemon alive; can be replaced with more
# sophisticated handling.
time.sleep(1)
except KeyboardInterrupt:
msg = _LI("Daemon Shutdown on KeyboardInterrupt")
LOG.info(msg)
LOG.info(_LI("Daemon Shutdown on KeyboardInterrupt"))
def _run(self, application):
LOG.debug("Running application")
self.daemon_pool.spawn_n(application.run, self.event)
eventlet.spawn_after(self.wakeup_time, self._run, application)
future: futures.Future = self.executor.submit(application.run)
# Schedule next run
LOG.debug("Next run scheduled in %s seconds", self.wakeup_time)
time.sleep(self.wakeup_time)
future.add_done_callback(self._run(application))
class Scrubber(object):
@ -346,7 +349,8 @@ class Scrubber(object):
self.store_api = store_api
self.admin_context = context.get_admin_context(show_deleted=True)
self.db_queue = get_scrub_queue()
self.pool = eventlet.greenpool.GreenPool(CONF.scrub_pool_size)
self.executor = futures.ThreadPoolExecutor(
max_workers=CONF.scrub_pool_size)
def _get_delete_jobs(self):
try:
@ -378,31 +382,28 @@ class Scrubber(object):
delete_jobs = self._get_delete_jobs()
if delete_jobs:
list(self.pool.starmap(self._scrub_image, delete_jobs.items()))
self.executor.map(self._scrub_image, delete_jobs.items())
def _scrub_image(self, image_id, delete_jobs):
def _scrub_image(self, delete_jobs):
if len(delete_jobs) == 0:
return
image_id, loc_details = delete_jobs
LOG.info(_LI("Scrubbing image %(id)s from %(count)d locations."),
{'id': image_id, 'count': len(delete_jobs)})
success = True
if CONF.enabled_backends:
for img_id, loc_id, uri, backend in delete_jobs:
try:
self._delete_image_location_from_backend(img_id, loc_id,
uri,
backend=backend)
except Exception:
success = False
else:
for img_id, loc_id, uri in delete_jobs:
try:
self._delete_image_location_from_backend(img_id, loc_id,
uri)
except Exception:
success = False
for item in loc_details:
try:
if CONF.enabled_backends:
img_id, loc_id, uri, backend = item
self._delete_image_location_from_backend(
img_id, loc_id, uri, backend=backend)
else:
img_id, loc_id, uri = item
self._delete_image_location_from_backend(
img_id, loc_id, uri)
except Exception:
success = False
if success:
image = db_api.get_api().image_get(self.admin_context, image_id)
@ -420,7 +421,7 @@ class Scrubber(object):
try:
LOG.debug("Scrubbing image %s from a location.", image_id)
try:
if CONF.enabled_backends:
if backend:
self.store_api.delete(uri, backend, self.admin_context)
else:
self.store_api.delete_from_backend(uri, self.admin_context)

View File

@ -1452,6 +1452,41 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
# we db_sync.
test_utils.db_sync(engine=engine)
def setup_scrubber_conf(self, daemon=False, wakeup_time=300,
scrub_pool_size=12):
self.scrubber_conf = os.path.join(
self.test_dir, 'glance-scrubber.conf')
db_file = 'sqlite:///%s/test.db' % self.test_dir
with open(self.scrubber_conf, 'w') as f:
f.write(textwrap.dedent("""
[DEFAULT]
enabled_backends=store1:file,store2:file,store3:file
daemon=%(daemon)s
wakeup_time=%(wakeup_time)s
scrub_pool_size=%(scrub_pool_size)s
[database]
connection=%(connection)s
[store1]
filesystem_store_datadir=%(store1)s
[store2]
filesystem_store_datadir=%(store2)s
[store3]
filesystem_store_datadir=%(store3)s
[os_glance_staging_dir]
filesystem_store_datadir=%(staging)s
[glance_store]
default_backend=store1
""") % {
"daemon": daemon,
"wakeup_time": wakeup_time,
"scrub_pool_size": scrub_pool_size,
"connection": db_file,
"store1": self._store_dir('store1'),
"store2": self._store_dir('store2'),
"store3": self._store_dir('store3'),
"staging": self._store_dir('staging'),
})
def setup_simple_paste(self):
"""Setup a very simple no-auth paste pipeline.
@ -1553,6 +1588,28 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
self.config(enforce_scope=True,
group='oslo_policy')
def start_scrubber(self, daemon=False, wakeup_time=300,
restore=None, raise_error=True):
self.setup_scrubber_conf(daemon=daemon, wakeup_time=wakeup_time)
exe_cmd = f"{sys.executable} -m glance.cmd.scrubber"
# Modify command based on restore and daemon flags
if restore:
exe_cmd += f" --restore {restore}"
if daemon:
exe_cmd += " --daemon"
# Prepare the final command string with the config directory
cmd = f"{exe_cmd} --config-dir {self.test_dir}"
# Determine if we need to return the process object
expect_exit = not daemon
return_process = daemon
# Execute the command and return the result
return execute(cmd, raise_error=raise_error, expect_exit=expect_exit,
return_process=return_process)
def _headers(self, custom_headers=None):
base_headers = {
'X-Identity-Status': 'Confirmed',

View File

@ -13,72 +13,29 @@
# License for the specific language governing permissions and limitations
# under the License.
import http.client
import os
import sys
import time
import httplib2
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import units
from glance import context
import glance.db as db_api
from glance.tests import functional
from glance.tests import utils as test_utils
from glance.tests.utils import execute
CONF = cfg.CONF
class TestScrubber(functional.FunctionalTest):
class TestScrubber(functional.SynchronousAPIBase):
"""Test that delayed_delete works and the scrubber deletes"""
def setUp(self):
super(TestScrubber, self).setUp()
self.api_server.deployment_flavor = 'noauth'
self.admin_context = context.get_admin_context(show_deleted=True)
CONF.set_override('connection', self.api_server.sql_connection,
group='database')
def _headers(self, custom_headers=None):
base_headers = {
'X-Identity-Status': 'Confirmed',
'X-Auth-Token': '932c5c84-02ac-4fe5-a9ba-620af0e2bb96',
'X-User-Id': 'f9a41d13-0c13-47e9-bee2-ce4e8bfe958e',
'X-Tenant-Id': uuids.TENANT1,
'X-Roles': 'reader,member',
}
base_headers.update(custom_headers or {})
return base_headers
def _send_create_image_http_request(self, path, body=None):
headers = {
"Content-Type": "application/json",
"X-Roles": "admin",
}
body = body or {'container_format': 'ovf',
'disk_format': 'raw',
'name': 'test_image',
'visibility': 'public'}
body = jsonutils.dumps(body)
return httplib2.Http().request(path, 'POST', body,
self._headers(headers))
def _send_upload_image_http_request(self, path, body=None):
headers = {
"Content-Type": "application/octet-stream"
}
return httplib2.Http().request(path, 'PUT', body,
self._headers(headers))
def _send_http_request(self, path, method):
headers = {
"Content-Type": "application/json"
}
return httplib2.Http().request(path, method, None,
self._headers(headers))
def _get_pending_delete_image(self, image_id):
# In Glance V2, there is no way to get the 'pending_delete' image from
@ -93,260 +50,149 @@ class TestScrubber(functional.FunctionalTest):
test that images don't get deleted immediately and that the scrubber
scrubs them
"""
self.cleanup()
kwargs = self.__dict__.copy()
self.start_servers(delayed_delete=True, daemon=True,
metadata_encryption_key='', **kwargs)
path = "http://%s:%d/v2/images" % ("127.0.0.1", self.api_port)
response, content = self._send_create_image_http_request(path)
self.assertEqual(http.client.CREATED, response.status)
image = jsonutils.loads(content)
self.assertEqual('queued', image['status'])
self.config(delayed_delete=True)
self.start_server()
self.scrubber = self.start_scrubber(daemon=True, wakeup_time=2)[3]
file_path = "%s/%s/file" % (path, image['id'])
response, content = self._send_upload_image_http_request(file_path,
body='XXX')
self.assertEqual(http.client.NO_CONTENT, response.status)
data = test_utils.FakeData(1 * units.Mi)
image_id = self._create_and_upload(data_iter=data)
path = "%s/%s" % (path, image['id'])
response, content = self._send_http_request(path, 'GET')
image = jsonutils.loads(content)
image = self.api_get('/v2/images/%s' % image_id).json
self.assertEqual('active', image['status'])
response, content = self._send_http_request(path, 'DELETE')
self.assertEqual(http.client.NO_CONTENT, response.status)
path = '/v2/images/%s' % image_id
self.api_delete(path)
image = self._get_pending_delete_image(image['id'])
self.assertEqual('pending_delete', image['status'])
self.wait_for_scrub(image['id'])
self.stop_servers()
if self.scrubber:
self.wait_for_scrub(image['id'])
self.scrubber.terminate()
self.scrubber.wait()
# Give the scrubber some time to stop.
time.sleep(5)
def test_scrubber_app(self):
"""
test that the glance-scrubber script runs successfully when not in
daemon mode
"""
self.cleanup()
kwargs = self.__dict__.copy()
self.start_servers(delayed_delete=True, daemon=False,
metadata_encryption_key='', **kwargs)
path = "http://%s:%d/v2/images" % ("127.0.0.1", self.api_port)
response, content = self._send_create_image_http_request(path)
self.assertEqual(http.client.CREATED, response.status)
image = jsonutils.loads(content)
self.assertEqual('queued', image['status'])
self.config(delayed_delete=True)
self.start_server()
file_path = "%s/%s/file" % (path, image['id'])
response, content = self._send_upload_image_http_request(file_path,
body='XXX')
self.assertEqual(http.client.NO_CONTENT, response.status)
data = test_utils.FakeData(1 * units.Mi)
image_id = self._create_and_upload(data_iter=data)
path = "%s/%s" % (path, image['id'])
response, content = self._send_http_request(path, 'GET')
image = jsonutils.loads(content)
image = self.api_get('/v2/images/%s' % image_id).json
self.assertEqual('active', image['status'])
response, content = self._send_http_request(path, 'DELETE')
self.assertEqual(http.client.NO_CONTENT, response.status)
path = '/v2/images/%s' % image_id
self.api_delete(path)
image = self._get_pending_delete_image(image['id'])
self.assertEqual('pending_delete', image['status'])
# wait for the scrub time on the image to pass
time.sleep(self.api_server.scrub_time)
# scrub images and make sure they get deleted
exe_cmd = "%s -m glance.cmd.scrubber" % sys.executable
cmd = ("%s --config-file %s" %
(exe_cmd, self.scrubber_daemon.conf_file_name))
exitcode, out, err = execute(cmd, raise_error=False)
self.assertEqual(0, exitcode)
self.start_scrubber(daemon=False, wakeup_time=2)
self.wait_for_scrub(image['id'])
self.stop_servers()
def test_scrubber_delete_handles_exception(self):
"""
Test that the scrubber handles the case where an
exception occurs when _delete() is called. The scrubber
should not write out queue files in this case.
"""
# Start servers.
self.cleanup()
kwargs = self.__dict__.copy()
self.start_servers(delayed_delete=True, daemon=False,
default_store='file', **kwargs)
# Check that we are using a file backend.
self.assertEqual(self.api_server.default_store, 'file')
self.config(delayed_delete=True)
self.start_server()
# add an image
path = "http://%s:%d/v2/images" % ("127.0.0.1", self.api_port)
response, content = self._send_create_image_http_request(path)
self.assertEqual(http.client.CREATED, response.status)
image = jsonutils.loads(content)
self.assertEqual('queued', image['status'])
data = test_utils.FakeData(1 * units.Mi)
image_id = self._create_and_upload(data_iter=data)
file_path = "%s/%s/file" % (path, image['id'])
response, content = self._send_upload_image_http_request(file_path,
body='XXX')
self.assertEqual(http.client.NO_CONTENT, response.status)
path = "%s/%s" % (path, image['id'])
response, content = self._send_http_request(path, 'GET')
image = jsonutils.loads(content)
image = self.api_get('/v2/images/%s' % image_id).json
self.assertEqual('active', image['status'])
# delete the image
response, content = self._send_http_request(path, 'DELETE')
self.assertEqual(http.client.NO_CONTENT, response.status)
path = '/v2/images/%s' % image_id
self.api_delete(path)
# ensure the image is marked pending delete.
image = self._get_pending_delete_image(image['id'])
self.assertEqual('pending_delete', image['status'])
# Remove the file from the backend.
file_path = os.path.join(self.api_server.image_dir, image['id'])
store_path = os.path.join(self.test_dir, 'store1')
file_path = os.path.join(store_path, image['id'])
os.remove(file_path)
# Wait for the scrub time on the image to pass
time.sleep(self.api_server.scrub_time)
# run the scrubber app, and ensure it doesn't fall over
exe_cmd = "%s -m glance.cmd.scrubber" % sys.executable
cmd = ("%s --config-file %s" %
(exe_cmd, self.scrubber_daemon.conf_file_name))
exitcode, out, err = execute(cmd, raise_error=False)
self.assertEqual(0, exitcode)
self.start_scrubber(daemon=False, wakeup_time=2)
self.wait_for_scrub(image['id'])
self.stop_servers()
def test_scrubber_app_queue_errors_not_daemon(self):
"""
test that the glance-scrubber exits with an exit code > 0 when it
fails to lookup images, indicating a configuration error when not
in daemon mode.
Related-Bug: #1548289
"""
# Don't start the registry server to cause intended failure
# Don't start the api server to save time
exitcode, out, err = self.scrubber_daemon.start(
delayed_delete=True, daemon=False)
self.assertEqual(0, exitcode,
"Failed to spin up the Scrubber daemon. "
"Got: %s" % err)
# Run the Scrubber
exe_cmd = "%s -m glance.cmd.scrubber" % sys.executable
cmd = ("%s --config-file %s" %
(exe_cmd, self.scrubber_daemon.conf_file_name))
exitcode, out, err = execute(cmd, raise_error=False)
self.assertEqual(1, exitcode)
self.assertIn('Can not get scrub jobs from queue', str(err))
self.stop_server(self.scrubber_daemon)
def test_scrubber_restore_image(self):
self.cleanup()
kwargs = self.__dict__.copy()
self.start_servers(delayed_delete=True, daemon=False,
metadata_encryption_key='', **kwargs)
path = "http://%s:%d/v2/images" % ("127.0.0.1", self.api_port)
response, content = self._send_create_image_http_request(path)
self.assertEqual(http.client.CREATED, response.status)
image = jsonutils.loads(content)
self.assertEqual('queued', image['status'])
self.config(delayed_delete=True)
self.start_server()
file_path = "%s/%s/file" % (path, image['id'])
response, content = self._send_upload_image_http_request(file_path,
body='XXX')
self.assertEqual(http.client.NO_CONTENT, response.status)
# add an image
data = test_utils.FakeData(1 * units.Mi)
image_id = self._create_and_upload(data_iter=data)
path = "%s/%s" % (path, image['id'])
response, content = self._send_http_request(path, 'GET')
image = jsonutils.loads(content)
image = self.api_get('/v2/images/%s' % image_id).json
self.assertEqual('active', image['status'])
response, content = self._send_http_request(path, 'DELETE')
self.assertEqual(http.client.NO_CONTENT, response.status)
# delete the image
path = '/v2/images/%s' % image_id
self.api_delete(path)
# ensure the image is marked pending delete.
image = self._get_pending_delete_image(image['id'])
self.assertEqual('pending_delete', image['status'])
def _test_content():
exe_cmd = "%s -m glance.cmd.scrubber" % sys.executable
cmd = ("%s --config-file %s --restore %s" %
(exe_cmd, self.scrubber_daemon.conf_file_name, image['id']))
return execute(cmd, raise_error=False)
return self.start_scrubber(daemon=False, wakeup_time=2,
restore=image['id'])
exitcode, out, err = self.wait_for_scrubber_shutdown(_test_content)
exitcode, out, err = self.wait_for_scrubber_shutdown(
_test_content)
self.assertEqual(0, exitcode)
response, content = self._send_http_request(path, 'GET')
image = jsonutils.loads(content)
image = self.api_get('/v2/images/%s' % image_id).json
self.assertEqual('active', image['status'])
self.stop_servers()
def test_scrubber_restore_active_image_raise_error(self):
self.cleanup()
self.start_servers(delayed_delete=True, daemon=False,
metadata_encryption_key='')
self.config(delayed_delete=True)
self.start_server()
path = "http://%s:%d/v2/images" % ("127.0.0.1", self.api_port)
response, content = self._send_create_image_http_request(path)
self.assertEqual(http.client.CREATED, response.status)
image = jsonutils.loads(content)
self.assertEqual('queued', image['status'])
# add an image
data = test_utils.FakeData(1 * units.Mi)
image_id = self._create_and_upload(data_iter=data)
file_path = "%s/%s/file" % (path, image['id'])
response, content = self._send_upload_image_http_request(file_path,
body='XXX')
self.assertEqual(http.client.NO_CONTENT, response.status)
path = "%s/%s" % (path, image['id'])
response, content = self._send_http_request(path, 'GET')
image = jsonutils.loads(content)
image = self.api_get('/v2/images/%s' % image_id).json
self.assertEqual('active', image['status'])
def _test_content():
exe_cmd = "%s -m glance.cmd.scrubber" % sys.executable
cmd = ("%s --config-file %s --restore %s" %
(exe_cmd, self.scrubber_daemon.conf_file_name, image['id']))
return execute(cmd, raise_error=False)
return self.start_scrubber(daemon=False, wakeup_time=2,
restore=image['id'], raise_error=False)
exitcode, out, err = self.wait_for_scrubber_shutdown(_test_content)
exitcode, out, err = self.wait_for_scrubber_shutdown(
_test_content)
self.assertEqual(1, exitcode)
self.assertIn('cannot restore the image from active to active '
'(wanted from_state=pending_delete)', str(err))
self.stop_servers()
def test_scrubber_restore_image_non_exist(self):
def _test_content():
scrubber = functional.ScrubberDaemon(self.test_dir,
self.policy_file)
scrubber.write_conf(daemon=False)
scrubber.needs_database = True
scrubber.create_database()
exe_cmd = "%s -m glance.cmd.scrubber" % sys.executable
cmd = ("%s --config-file %s --restore fake_image_id" %
(exe_cmd, scrubber.conf_file_name))
return execute(cmd, raise_error=False)
return self.start_scrubber(
daemon=False, wakeup_time=2, restore='fake_image_id',
raise_error=False)
exitcode, out, err = self.wait_for_scrubber_shutdown(_test_content)
exitcode, out, err = self.wait_for_scrubber_shutdown(
_test_content)
self.assertEqual(1, exitcode)
self.assertIn('No image found with ID fake_image_id', str(err))
def test_scrubber_restore_image_with_daemon_raise_error(self):
exe_cmd = "%s -m glance.cmd.scrubber" % sys.executable
cmd = ("%s --daemon --restore fake_image_id" % exe_cmd)
exitcode, out, err = execute(cmd, raise_error=False)
@ -356,8 +202,7 @@ class TestScrubber(functional.FunctionalTest):
'together', str(err))
def test_scrubber_restore_image_with_daemon_running(self):
self.cleanup()
self.scrubber_daemon.start(daemon=True)
self.scrubber = self.start_scrubber(daemon=True, wakeup_time=2)[3]
# Give the scrubber some time to start.
time.sleep(5)
@ -367,7 +212,12 @@ class TestScrubber(functional.FunctionalTest):
self.assertEqual(1, exitcode)
self.assertIn('glance-scrubber is already running', str(err))
self.stop_server(self.scrubber_daemon)
# terminate daemon process
if self.scrubber:
self.scrubber.terminate()
self.scrubber.wait()
# Give the scrubber some time to stop.
time.sleep(5)
def wait_for_scrubber_shutdown(self, func):
# NOTE(wangxiyuan, rosmaita): The image-restore functionality contains

View File

@ -60,7 +60,7 @@ class TestScrubber(test_utils.BaseTestCase):
scrub = scrubber.Scrubber(glance_store)
with patch.object(glance_store,
"delete_from_backend"):
scrub._scrub_image(id, [(id, '-', uri)])
scrub._scrub_image((id, [(id, '-', uri)]))
@mock.patch.object(db_api, "image_get")
def test_store_delete_store_exceptions(self, mock_image_get):
@ -76,7 +76,7 @@ class TestScrubber(test_utils.BaseTestCase):
with patch.object(glance_store,
"delete_from_backend") as _mock_delete:
_mock_delete.side_effect = ex
scrub._scrub_image(id, [(id, '-', uri)])
scrub._scrub_image((id, [(id, '-', uri)]))
@mock.patch.object(db_api, "image_get")
def test_store_delete_notfound_exception(self, mock_image_get):
@ -90,7 +90,7 @@ class TestScrubber(test_utils.BaseTestCase):
with patch.object(glance_store,
"delete_from_backend") as _mock_delete:
_mock_delete.side_effect = ex
scrub._scrub_image(id, [(id, '-', uri)])
scrub._scrub_image((id, [(id, '-', uri)]))
def test_scrubber_exits(self):
# Checks for Scrubber exits when it is not able to fetch jobs from

View File

@ -332,7 +332,8 @@ def execute(cmd,
exec_env=None,
expect_exit=True,
expected_exitcode=0,
context=None):
context=None,
return_process=False):
"""
Executes a command in a subprocess. Returns a tuple
of (exitcode, out, err), where out is the string output
@ -350,6 +351,7 @@ def execute(cmd,
:param expect_exit: Optional flag true iff timely exit is expected
:param expected_exitcode: expected exitcode from the launcher
:param context: additional context for error message
:param return_process: return process to terminate explicitly
"""
env = os.environ.copy()
@ -402,6 +404,10 @@ def execute(cmd,
if context:
msg += "\n\nCONTEXT: %s" % context
raise RuntimeError(msg)
if return_process:
return exitcode, out, err, process
return exitcode, out, err