Add task cancellation tracker utility

This module provides functions to register, cancel, and signal cancellation
of long-running tasks using lock files and external locking. It ensures
atomic operation registration, safe cancellation, and proper cleanup,
supporting concurrent task management.

Change-Id: Ic58233302001a2396e84400ef7081afa51748a2a
This commit is contained in:
Abhishek Kekane
2025-05-27 20:00:14 +00:00
parent dac9741c2e
commit 1311d5fe2f
3 changed files with 213 additions and 1 deletions

View File

@@ -0,0 +1,101 @@
# Copyright 2025 RedHat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from glance.common import exception
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
def get_data_dir():
"""Return the filesystem store data directory from config."""
if CONF.enabled_backends:
return CONF.os_glance_tasks_store.filesystem_store_datadir
else:
# NOTE(abhishekk): strip the 'file://' prefix from the URI
return CONF.node_staging_uri[7:]
def path_for_op(operation_id, prefix='running-task-'):
"""Construct the file path for a given operation ID."""
return os.path.join(get_data_dir(), "%s%s" % (prefix, operation_id))
def is_canceled(operation_id):
"""
Check if the operation has been canceled (file exists and
is nonzero length).
"""
operation_path = path_for_op(operation_id)
return os.path.exists(operation_path) and os.path.getsize(
operation_path) > 0
def register_operation(operation_id):
"""Register a new operation by creating a lock file."""
with lockutils.external_lock('tasks'):
operation_path = path_for_op(operation_id)
try:
# Use os.open with O_CREAT | O_EXCL to ensure atomic creation
fd = os.open(operation_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
except FileExistsError:
# Handle the case where the lock file already exists
raise RuntimeError(f"Operation {operation_id} is "
f"already registered.")
def cancel_operation(operation_id):
"""
Mark an operation as canceled by writing to the lock file if it exists.
"""
with lockutils.external_lock('tasks'):
operation_path = path_for_op(operation_id)
if not os.path.exists(operation_path):
raise exception.ServerError(
"Operation file for %s does not exist, cannot cancel.",
operation_id)
with open(operation_path, 'w') as f:
f.write(str(operation_id))
# Wait for the system to acknowledge the cancellation
for _ in range(60):
if os.path.exists(operation_path):
time.sleep(0.5)
continue
return
# If still not canceled after timeout, raise an exception
raise exception.ServerError("Timeout canceling in-progress "
"task %s" % operation_id)
def signal_finished(operation_id):
"""Remove the lock file to signal that the operation is canceled."""
with lockutils.external_lock('tasks'):
operation_path = path_for_op(operation_id)
try:
os.remove(operation_path)
except FileNotFoundError:
LOG.warning("Attempted to signal finished for operation %s, "
"but the operation file does not "
"exist.", operation_path)

View File

@@ -87,11 +87,14 @@ class MultiStoreClearingUnitTest(test_utils.BaseTestCase):
'readonly_store': 'http',
'fast-cinder': 'cinder',
'fast-rbd': 'rbd', 'reliable': 'swift'})
store.register_store_opts(CONF)
store.register_store_opts(
CONF, reserved_stores={'os_glance_tasks_store': 'file'})
self.config(default_backend='fast',
group='glance_store')
self.config(filesystem_store_datadir=self.test_dir,
group='os_glance_tasks_store')
self.config(filesystem_store_datadir=self.test_dir,
filesystem_thin_provisioning=False,
filesystem_store_chunk_size=65536,

View File

@@ -0,0 +1,108 @@
# Copyright 2025 RedHat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from unittest import mock
from oslo_concurrency import lockutils
from glance.common import exception
from glance import task_cancellation_tracker as tracker
from glance.tests.unit import base
class TestTaskCancellationTracker(base.MultiStoreClearingUnitTest):
def setUp(self):
super(TestTaskCancellationTracker, self).setUp()
def test_get_data_dir(self):
self.assertEqual(tracker.get_data_dir(), self.test_dir)
def test_path_for_op(self):
op_id = '123'
expected = os.path.join(self.test_dir, "%s%s" % (
"running-task-", op_id))
self.assertEqual(tracker.path_for_op(op_id), expected)
def test_register_and_is_canceled(self):
op_id = 'op1'
tracker.register_operation(op_id)
self.assertFalse(tracker.is_canceled(op_id))
# File should exist but be zero-length
path = tracker.path_for_op(op_id)
self.assertTrue(os.path.exists(path))
self.assertEqual(os.path.getsize(path), 0)
def test_signal_finished(self):
op_id = 'op2'
tracker.register_operation(op_id)
tracker.signal_finished(op_id)
self.assertFalse(os.path.exists(tracker.path_for_op(op_id)))
@mock.patch('builtins.open', new_callable=mock.mock_open)
@mock.patch.object(lockutils, 'external_lock')
@mock.patch('os.path.exists', side_effect=[True, False])
@mock.patch('time.sleep')
def test_cancel_operation_immediate_cancel(
self, mock_sleep, mock_exists, mock_external_lock, mock_open):
op_id = 'op3'
tracker.register_operation(op_id)
tracker.cancel_operation(op_id)
self.assertTrue(mock_external_lock.called)
self.assertTrue(mock_open.called)
mock_sleep.assert_not_called()
mock_exists.assert_any_call(tracker.path_for_op(op_id))
def test_register_operation_already_exists(self):
op_id = 'op4'
tracker.register_operation(op_id)
self.assertRaises(RuntimeError, tracker.register_operation, op_id)
def test_signal_finished_file_not_found(self):
op_id = 'op5'
# Should not raise
tracker.signal_finished(op_id)
@mock.patch('os.path.exists')
@mock.patch('time.sleep', return_value=None)
def test_cancel_operation_timeout(self, mock_sleep, mock_exists):
mock_exists.return_value = True
op_id = 'op6'
self.assertRaises(
exception.ServerError, tracker.cancel_operation, op_id)
self.assertTrue(mock_sleep.called)
@mock.patch('time.sleep', return_value=None)
def test_cancel_operation_eventually_cancels(self, mock_sleep):
op_id = 'op7'
tracker.register_operation(op_id)
def side_effect(path):
# After 3 calls, simulate file removal
if mock_sleep.call_count >= 3:
return False
return True
with mock.patch('os.path.exists', side_effect=side_effect):
with mock.patch('os.path.getsize', return_value=1):
tracker.cancel_operation(op_id)
self.assertEqual(mock_sleep.call_count, 3)
@mock.patch('os.path.exists')
def test_cancel_operation_not_registered(self, mock_exists):
mock_exists.return_value = False
op_id = 'op8'
self.assertFalse(tracker.is_canceled(op_id))
self.assertRaises(
exception.ServerError, tracker.cancel_operation, op_id)