From 586ca78a230e98d286241080fb22d0b3ec2304b3 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Mon, 27 Jul 2020 10:12:40 -0700 Subject: [PATCH] Heartbeat the actual work of the task This introduces a CallbackIterator object that can sit in the data pipeline when setting image data and provide periodic callbacks to allow recording status, checking for cancellation, etc. If the callback raises an exception, that will bubble up to the read operation and cause an abort. The heartbeat is timer-based, but cooperative, which means if the data pipeline stalls, we will stop heartbeating. The goal is to provide a callback about once per minute, although blocking on the pipeline could make that less regular of course. Change-Id: I136ef792d601326a67266e9ea9fcadd79ddba69e (cherry picked from commit e49f23c04d784b5312d7df4556e2e5064555a84f) --- glance/async_/flows/api_image_import.py | 28 +++++- glance/common/scripts/image_import/main.py | 8 +- glance/common/scripts/utils.py | 97 +++++++++++++++++++ .../async_/flows/test_api_image_import.py | 86 +++++++++++++++- .../common/scripts/image_import/test_main.py | 26 +++++ .../unit/common/scripts/test_scripts_utils.py | 80 +++++++++++++++ 6 files changed, 317 insertions(+), 8 deletions(-) diff --git a/glance/async_/flows/api_image_import.py b/glance/async_/flows/api_image_import.py index f26d16b4e7..74970ef9c6 100644 --- a/glance/async_/flows/api_image_import.py +++ b/glance/async_/flows/api_image_import.py @@ -12,6 +12,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import functools import os import glance_store as store_api @@ -20,6 +21,8 @@ from glance_store import exceptions as store_exceptions from oslo_config import cfg from oslo_log import log as logging from oslo_utils import encodeutils +from oslo_utils import timeutils +from oslo_utils import units import six from taskflow.patterns import linear_flow as lf from taskflow import retry @@ -203,7 +206,8 @@ class _ImportActions(object): """ self.merge_store_list(self.IMPORT_FAILED_KEY, stores, subtract=True) - def set_image_data(self, uri, task_id, backend, set_active): + def set_image_data(self, uri, task_id, backend, set_active, + callback=None): """Populate image with data on a specific backend. This is used during an image import operation to populate the data @@ -218,10 +222,17 @@ class _ImportActions(object): :param backend: The backend store to target the data :param set_active: Whether or not to set the image to 'active' state after the operation completes + :param callback: A callback function with signature: + fn(action, chunk_bytes, total_bytes) + which should be called while processing the image + approximately every minute. """ + if callback: + callback = functools.partial(callback, self) return image_import.set_image_data(self._image, uri, task_id, backend=backend, - set_active=set_active) + set_active=set_active, + callback=callback) def set_image_status(self, status): """Set the image status. @@ -392,6 +403,7 @@ class _ImportToStore(task.Task): self.backend = backend self.all_stores_must_succeed = all_stores_must_succeed self.set_active = set_active + self.last_status = 0 super(_ImportToStore, self).__init__( name='%s-ImportToStore-%s' % (task_type, task_id)) @@ -445,6 +457,7 @@ class _ImportToStore(task.Task): self._execute(action, file_path) def _execute(self, action, file_path): + self.last_status = timeutils.now() if action.image_status == "deleted": raise exception.ImportTaskError("Image has been deleted, aborting" @@ -452,7 +465,8 @@ class _ImportToStore(task.Task): try: action.set_image_data(file_path or self.uri, self.task_id, backend=self.backend, - set_active=self.set_active) + set_active=self.set_active, + callback=self._status_callback) # NOTE(yebinama): set_image_data catches Exception and raises from # them. Can't be more specific on exceptions catched. except Exception: @@ -468,6 +482,14 @@ class _ImportToStore(task.Task): if self.backend is not None: action.remove_importing_stores([self.backend]) + def _status_callback(self, action, chunk_bytes, total_bytes): + # NOTE(danms): Only log status every five minutes + if timeutils.now() - self.last_status > 300: + LOG.debug('Image import %(image_id)s copied %(copied)i MiB', + {'image_id': action.image_id, + 'copied': total_bytes // units.Mi}) + self.last_status = timeutils.now() + def revert(self, result, **kwargs): """ Remove location from image in case of failure diff --git a/glance/common/scripts/image_import/main.py b/glance/common/scripts/image_import/main.py index 25116fc77c..9b624920db 100644 --- a/glance/common/scripts/image_import/main.py +++ b/glance/common/scripts/image_import/main.py @@ -137,12 +137,18 @@ def create_image(image_repo, image_factory, image_properties, task_id): return image -def set_image_data(image, uri, task_id, backend=None, set_active=True): +def set_image_data(image, uri, task_id, backend=None, set_active=True, + callback=None): data_iter = None try: LOG.info(_LI("Task %(task_id)s: Got image data uri %(data_uri)s to be " "imported"), {"data_uri": uri, "task_id": task_id}) data_iter = script_utils.get_image_data_iter(uri) + if callback: + # If a callback was provided, wrap our data iterator to call + # the function every 60 seconds. + data_iter = script_utils.CallbackIterator( + data_iter, callback, min_interval=60) image.set_data(data_iter, backend=backend, set_active=set_active) except Exception as e: with excutils.save_and_reraise_exception(): diff --git a/glance/common/scripts/utils.py b/glance/common/scripts/utils.py index 533e49426a..25e7fcffb6 100644 --- a/glance/common/scripts/utils.py +++ b/glance/common/scripts/utils.py @@ -23,6 +23,7 @@ __all__ = [ from oslo_log import log as logging +from oslo_utils import timeutils from six.moves import urllib from glance.common import exception @@ -139,3 +140,99 @@ def get_image_data_iter(uri): return open(uri, "rb") return urllib.request.urlopen(uri) + + +class CallbackIterator(object): + """A proxy iterator that calls a callback function periodically + + This is used to wrap a reading file object and proxy its chunks + through to another caller. Periodically, the callback function + will be called with information about the data processed so far, + allowing for status updating or cancel flag checking. The function + can be called every time we process a chunk, or only after we have + processed a certain amount of data since the last call. + + :param source: A source iterator whose content will be proxied + through this object. + :param callback: A function to be called periodically while iterating. + The signature should be fn(chunk_bytes, total_bytes), + where chunk is the number of bytes since the last + call of the callback, and total_bytes is the total amount + copied thus far. + :param min_interval: Limit the calls to callback to only when this many + seconds have elapsed since the last callback (a + close() or final iteration may fire the callback in + less time to ensure completion). + """ + + def __init__(self, source, callback, min_interval=None): + self._source = source + self._callback = callback + self._min_interval = min_interval + self._chunk_bytes = 0 + self._total_bytes = 0 + self._timer = None + + @property + def callback_due(self): + """Indicates if a callback should be made. + + If no time-based limit is set, this will always be True. + If a limit is set, then this returns True exactly once, + resetting the timer when it does. + """ + if not self._min_interval: + return True + + if not self._timer: + self._timer = timeutils.StopWatch(self._min_interval) + self._timer.start() + + if self._timer.expired(): + self._timer.restart() + return True + else: + return False + + def __iter__(self): + return self + + def __next__(self): + try: + chunk = next(self._source) + except StopIteration: + # NOTE(danms): Make sure we call the callback the last + # time if we have processed data since the last one. + self._call_callback(b'', is_last=True) + raise + + self._call_callback(chunk) + return chunk + + def close(self): + self._call_callback(b'', is_last=True) + if hasattr(self._source, 'close'): + return self._source.close() + + def _call_callback(self, chunk, is_last=False): + self._total_bytes += len(chunk) + self._chunk_bytes += len(chunk) + + if not self._chunk_bytes: + # NOTE(danms): Never call the callback if we haven't processed + # any data since the last time + return + + if is_last or self.callback_due: + # FIXME(danms): Perhaps we should only abort the read if + # the callback raises a known abort exception, otherwise + # log and swallow. Need to figure out what exception + # read() callers would be expecting that we could raise + # from here. + self._callback(self._chunk_bytes, self._total_bytes) + self._chunk_bytes = 0 + + def read(self, size=None): + chunk = self._source.read(size) + self._call_callback(chunk) + return chunk diff --git a/glance/tests/unit/async_/flows/test_api_image_import.py b/glance/tests/unit/async_/flows/test_api_image_import.py index 25f434beb7..7d10a40203 100644 --- a/glance/tests/unit/async_/flows/test_api_image_import.py +++ b/glance/tests/unit/async_/flows/test_api_image_import.py @@ -143,7 +143,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): action.set_image_data.assert_called_once_with( mock.sentinel.path, TASK_ID1, backend='store1', - set_active=True) + set_active=True, + callback=image_import._status_callback) action.remove_importing_stores(['store1']) def test_execute_body_with_store_no_path(self): @@ -161,7 +162,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): action.set_image_data.assert_called_once_with( 'http://url', TASK_ID1, backend='store1', - set_active=True) + set_active=True, + callback=image_import._status_callback) action.remove_importing_stores(['store1']) def test_execute_body_without_store(self): @@ -179,9 +181,63 @@ class TestImportToStoreTask(test_utils.BaseTestCase): action.set_image_data.assert_called_once_with( mock.sentinel.path, TASK_ID1, backend=None, - set_active=True) + set_active=True, + callback=image_import._status_callback) action.remove_importing_stores.assert_not_called() + @mock.patch('glance.async_.flows.api_image_import.LOG.debug') + @mock.patch('oslo_utils.timeutils.now') + def test_status_callback_limits_rate(self, mock_now, mock_log): + img_repo = mock.MagicMock() + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, + wrapper, + "http://url", + None, False, + True) + + expected_calls = [] + log_call = mock.call('Image import %(image_id)s copied %(copied)i MiB', + {'image_id': IMAGE_ID1, + 'copied': 0}) + action = mock.MagicMock(image_id=IMAGE_ID1) + + mock_now.return_value = 1000 + image_import._status_callback(action, 32, 32) + # First call will emit immediately because we only ran __init__ + # which sets the last status to zero + expected_calls.append(log_call) + mock_log.assert_has_calls(expected_calls) + + image_import._status_callback(action, 32, 64) + # Second call will not emit any other logs because no time + # has passed + mock_log.assert_has_calls(expected_calls) + + mock_now.return_value += 190 + image_import._status_callback(action, 32, 96) + # Third call will not emit any other logs because not enough + # time has passed + mock_log.assert_has_calls(expected_calls) + + mock_now.return_value += 300 + image_import._status_callback(action, 32, 128) + # Fourth call will emit because we crossed five minutes + expected_calls.append(log_call) + mock_log.assert_has_calls(expected_calls) + + mock_now.return_value += 150 + image_import._status_callback(action, 32, 128) + # Fifth call will not emit any other logs because not enough + # time has passed + mock_log.assert_has_calls(expected_calls) + + mock_now.return_value += 3600 + image_import._status_callback(action, 32, 128) + # Sixth call will emit because we crossed five minutes + expected_calls.append(log_call) + mock_log.assert_has_calls(expected_calls) + def test_raises_when_image_deleted(self): img_repo = mock.MagicMock() wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) @@ -525,7 +581,29 @@ class TestImportActions(test_utils.BaseTestCase): mock.sentinel.backend, mock.sentinel.set_active)) mock_sid.assert_called_once_with( self.image, mock.sentinel.uri, mock.sentinel.task_id, - backend=mock.sentinel.backend, set_active=mock.sentinel.set_active) + backend=mock.sentinel.backend, set_active=mock.sentinel.set_active, + callback=None) + + @mock.patch.object(image_import, 'set_image_data') + def test_set_image_data_with_callback(self, mock_sid): + def fake_set_image_data(image, uri, task_id, backend=None, + set_active=False, + callback=None): + callback(mock.sentinel.chunk, mock.sentinel.total) + + mock_sid.side_effect = fake_set_image_data + + callback = mock.MagicMock() + self.actions.set_image_data(mock.sentinel.uri, mock.sentinel.task_id, + mock.sentinel.backend, + mock.sentinel.set_active, + callback=callback) + + # Make sure our callback was triggered through the functools.partial + # to include the original params and the action wrapper + callback.assert_called_once_with(self.actions, + mock.sentinel.chunk, + mock.sentinel.total) def test_remove_location_for_store(self): self.image.locations = [ diff --git a/glance/tests/unit/common/scripts/image_import/test_main.py b/glance/tests/unit/common/scripts/image_import/test_main.py index db06ec85ba..77f7e7f72d 100644 --- a/glance/tests/unit/common/scripts/image_import/test_main.py +++ b/glance/tests/unit/common/scripts/image_import/test_main.py @@ -122,3 +122,29 @@ class TestImageImport(test_utils.BaseTestCase): self.assertEqual(1, mock_set_img_data.call_count) mock_delete_data.assert_called_once_with( mock_create_image().context, image_id, 'location') + + @mock.patch('oslo_utils.timeutils.StopWatch') + @mock.patch('glance.common.scripts.utils.get_image_data_iter') + def test_set_image_data_with_callback(self, mock_gidi, mock_sw): + data = [b'0' * 60, b'0' * 50, b'0' * 10, b'0' * 150] + result_data = [] + mock_gidi.return_value = iter(data) + mock_sw.return_value.expired.side_effect = [False, True, False, + False] + image = mock.MagicMock() + callback = mock.MagicMock() + + def fake_set_data(data_iter, **kwargs): + for chunk in data_iter: + result_data.append(chunk) + + image.set_data.side_effect = fake_set_data + image_import_script.set_image_data(image, 'http://fake', None, + callback=callback) + + mock_gidi.assert_called_once_with('http://fake') + self.assertEqual(data, result_data) + # Since we only fired the timer once, only two calls expected + # for the four reads we did, including the final obligatory one + callback.assert_has_calls([mock.call(110, 110), + mock.call(160, 270)]) diff --git a/glance/tests/unit/common/scripts/test_scripts_utils.py b/glance/tests/unit/common/scripts/test_scripts_utils.py index 986c1ee6ec..7bada7e51d 100644 --- a/glance/tests/unit/common/scripts/test_scripts_utils.py +++ b/glance/tests/unit/common/scripts/test_scripts_utils.py @@ -124,3 +124,83 @@ class TestScriptsUtils(test_utils.BaseTestCase): location = 'cinder://' self.assertRaises(urllib.error.URLError, script_utils.validate_location_uri, location) + + +class TestCallbackIterator(test_utils.BaseTestCase): + def test_iterator_iterates(self): + # Include a zero-length generation to make sure we don't trigger + # the callback when nothing new has happened. + items = ['1', '2', '', '3'] + callback = mock.MagicMock() + cb_iter = script_utils.CallbackIterator(iter(items), callback) + iter_items = list(cb_iter) + callback.assert_has_calls([mock.call(1, 1), + mock.call(1, 2), + mock.call(1, 3)]) + self.assertEqual(items, iter_items) + + # Make sure we don't call the callback on close if we + # have processed all the data + callback.reset_mock() + cb_iter.close() + callback.assert_not_called() + + @mock.patch('oslo_utils.timeutils.StopWatch') + def test_iterator_iterates_granularly(self, mock_sw): + items = ['1', '2', '3'] + callback = mock.MagicMock() + mock_sw.return_value.expired.side_effect = [False, True, False] + cb_iter = script_utils.CallbackIterator(iter(items), callback, + min_interval=30) + iter_items = list(cb_iter) + self.assertEqual(items, iter_items) + # The timer only fired once, but we should still expect the final + # chunk to be emitted. + callback.assert_has_calls([mock.call(2, 2), + mock.call(1, 3)]) + + mock_sw.assert_called_once_with(30) + mock_sw.return_value.start.assert_called_once_with() + mock_sw.return_value.restart.assert_called_once_with() + + # Make sure we don't call the callback on close if we + # have processed all the data + callback.reset_mock() + cb_iter.close() + callback.assert_not_called() + + def test_proxy_close(self): + callback = mock.MagicMock() + source = mock.MagicMock() + del source.close + # NOTE(danms): This will generate AttributeError if it + # tries to call close after the del above. + script_utils.CallbackIterator(source, callback).close() + + source = mock.MagicMock() + source.close.return_value = 'foo' + script_utils.CallbackIterator(source, callback).close() + source.close.assert_called_once_with() + + # We didn't process any data, so no callback should be expected + callback.assert_not_called() + + @mock.patch('oslo_utils.timeutils.StopWatch') + def test_proxy_read(self, mock_sw): + items = ['1', '2', '3'] + source = mock.MagicMock() + source.read.side_effect = items + callback = mock.MagicMock() + mock_sw.return_value.expired.side_effect = [False, True, False] + cb_iter = script_utils.CallbackIterator(source, callback, + min_interval=30) + results = [cb_iter.read(1) for i in range(len(items))] + self.assertEqual(items, results) + # The timer only fired once while reading, so we only expect + # one callback. + callback.assert_has_calls([mock.call(2, 2)]) + cb_iter.close() + # If we close with residue since the last callback, we should + # call the callback with that. + callback.assert_has_calls([mock.call(2, 2), + mock.call(1, 3)])