diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index 885b901bb0..329e60ab95 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -47,6 +47,7 @@ from glance.i18n import _, _LE, _LI, _LW import glance.notifier from glance.quota import keystone as ks_quota import glance.schema +from glance import task_cancellation_tracker as tracker LOG = logging.getLogger(__name__) @@ -57,6 +58,8 @@ CONF.import_opt('container_formats', 'glance.common.config', CONF.import_opt('show_multiple_locations', 'glance.common.config') CONF.import_opt('hashing_algorithm', 'glance.common.config') +PROXYABLE_HOSTS = ['os_glance_stage_host', 'os_glance_hash_op_host'] + def proxy_response_error(orig_code, orig_explanation): """Construct a webob.exc.HTTPError exception on the fly. @@ -237,12 +240,13 @@ class ImagesController(object): {'image': image.image_id, 'task': task.task_id, 'keys': ','.join(changed)}) - def _proxy_request_to_stage_host(self, image, req, body=None): - """Proxy a request to a staging host. + def _proxy_request_to_host(self, image, req, body=None): + """Proxy a request to a respected host. - When an image was staged on another worker, that worker may record its - worker_self_reference_url on the image, indicating that other workers - should proxy requests to it while the image is staged. This method + When an image was staged on another worker or Location add workflow + having hash calculation operation is in progress, that worker may + record its worker_self_reference_url on the image, indicating that + other workers should proxy requests to it. This method replays our current request against the remote host, returns the result, and performs any response error translation required. @@ -258,13 +262,12 @@ class ImagesController(object): remote host. """ - stage_host = image.extra_properties['os_glance_stage_host'] + host = self.is_proxyable(image) LOG.info(_LI('Proxying %s request to host %s ' - 'which has image staged'), - req.method, stage_host) + 'which has image.'), + req.method, host) client = glance_context.get_ksa_client(req.context) - url = '%s%s' % (stage_host, req.path) - req_id_hdr = 'x-openstack-request-id' + url = '%s%s' % (host, req.path) request_method = getattr(client, req.method.lower()) try: r = request_method(url, json=body, timeout=60) @@ -294,17 +297,18 @@ class ImagesController(object): return CONF.worker_self_reference_url or CONF.public_endpoint def is_proxyable(self, image): - """Decide if an action is proxyable to a stage host. - - If the image has a staging host recorded with a URL that does not match - ours, then we can proxy our request to that host. + """ + Return the host from extra_properties for any key in PROXYABLE_HOSTS + if it is not a local host. :param image: The Image from the repo - :returns: bool indicating proxyable status + :returns: host string if proxyable, else None """ - return ( - 'os_glance_stage_host' in image.extra_properties and - image.extra_properties['os_glance_stage_host'] != self.self_url) + for host_key in PROXYABLE_HOSTS: + host = image.extra_properties.get(host_key) + if host and host != self.self_url: + return host + return None @utils.mutating def import_image(self, req, image_id, body): @@ -435,7 +439,7 @@ class ImagesController(object): # NOTE(danms): Image is staged on another worker; proxy the # import request to that worker with the user's token, as if # they had called it themselves. - return self._proxy_request_to_stage_host(image, req, body) + return self._proxy_request_to_host(image, req, body) task_input = {'image_id': image_id, 'import_req': body, @@ -834,7 +838,7 @@ class ImagesController(object): :raises: webob.exc.HTTPClientError if so raised by the remote server. """ try: - self._proxy_request_to_stage_host(image, req) + self._proxy_request_to_host(image, req) except webob.exc.HTTPServerError: # This means we would have raised a 50x error, indicating # we did not succeed with the request to the remote host. @@ -867,6 +871,15 @@ class ImagesController(object): # Delete was proxied, so we are done here. return + # NOTE(abhishekk): Here we can cancel the hash calculation + # operation and signal exit to the worker that is performing + # the hash + operation_id = image.extra_properties.get('os_glance_import_task') + os_hashing_host = image.extra_properties.get( + 'os_glance_hash_op_host') + if operation_id and os_hashing_host: + tracker.cancel_operation(operation_id) + # NOTE(abhishekk): Delete the data from staging area if CONF.enabled_backends: separator, staging_dir = store_utils.get_dir_separator() @@ -1697,7 +1710,7 @@ class RequestDeserializer(wsgi.JSONRequestDeserializer): class ResponseSerializer(wsgi.JSONResponseSerializer): # These properties will be filtered out from the response and not # exposed to the client - _hidden_properties = ['os_glance_stage_host'] + _hidden_properties = ['os_glance_stage_host', 'os_glance_hash_op_host'] def __init__(self, schema=None, location_schema=None): super(ResponseSerializer, self).__init__() diff --git a/glance/async_/flows/location_import.py b/glance/async_/flows/location_import.py index b9faf45b0f..09749dd206 100644 --- a/glance/async_/flows/location_import.py +++ b/glance/async_/flows/location_import.py @@ -18,6 +18,7 @@ import glance_store as store from oslo_config import cfg from oslo_log import log as logging from oslo_utils import encodeutils +from oslo_utils import excutils from taskflow.patterns import linear_flow as lf from taskflow import retry from taskflow import task @@ -26,6 +27,7 @@ import glance.async_.flows.api_image_import as image_import from glance.common import exception from glance.common import store_utils from glance.i18n import _, _LW +from glance import task_cancellation_tracker as tracker LOG = logging.getLogger(__name__) @@ -44,6 +46,12 @@ class _InvalidLocation(exception.GlanceException): super(_InvalidLocation, self).__init__(message) +class _HashCalculationCanceled(exception.GlanceException): + + def __init__(self, message): + super(_HashCalculationCanceled, self).__init__(message) + + class _CalculateHash(task.Task): def __init__(self, task_id, task_type, image_repo, image_id, @@ -61,6 +69,10 @@ class _CalculateHash(task.Task): current_os_hash_value = hashlib.new(self.hashing_algo) current_checksum = hashlib.md5(usedforsecurity=False) for chunk in image.get_data(): + if tracker.is_canceled(self.task_id): + raise _HashCalculationCanceled( + _('Hash calculation for image %s has been ' + 'canceled') % self.image_id) if chunk is None: break current_checksum.update(chunk) @@ -69,12 +81,17 @@ class _CalculateHash(task.Task): image.os_hash_value = current_os_hash_value.hexdigest() def _set_checksum_and_hash(self, image): + tracker.register_operation(self.task_id) retries = 0 while retries <= CONF.http_retries and image.os_hash_value is None: retries += 1 try: self._calculate_hash(image) self.image_repo.save(image) + except _HashCalculationCanceled as e: + with excutils.save_and_reraise_exception(): + LOG.debug('Hash calculation cancelled: %s', + encodeutils.exception_to_unicode(e)) except IOError as e: LOG.debug('[%i/%i] Hash calculation failed due to %s', retries, CONF.http_retries, @@ -92,26 +109,25 @@ class _CalculateHash(task.Task): "data") % self.image_id) LOG.warning(msg) except store.exceptions.NotFound: - # NOTE(pdeore): This can happen if image delete attempted - # when hash calculation is in progress, which deletes the - # image data from backend(specially rbd) but image remains - # in 'active' state. - # see: https://bugs.launchpad.net/glance/+bug/2045769 - # Once this ceph side issue is fixed, we'll keep only the - # warning message here and will remove the deletion part - # which is a temporary workaround. LOG.debug(_('Failed to calculate checksum of %(image_id)s ' 'as image data has been deleted from the ' 'backend'), {'image_id': self.image_id}) - image.delete() - self.image_repo.remove(image) - break + finally: + tracker.signal_finished(self.task_id) + image.extra_properties.pop('os_glance_hash_op_host', None) + self.image_repo.save(image) def execute(self): image = self.image_repo.get(self.image_id) if image.status == 'queued': image.status = self.image_status image.os_hash_algo = self.hashing_algo + # NOTE(abhishekk): Record this worker's + # worker_self_reference_url in the image metadata, so we + # know who is calculating the checksum and hash. + self_url = CONF.worker_self_reference_url or CONF.public_endpoint + if self_url: + image.extra_properties['os_glance_hash_op_host'] = self_url self.image_repo.save(image) self._set_checksum_and_hash(image) @@ -121,6 +137,7 @@ class _CalculateHash(task.Task): state """ try: + tracker.signal_finished(self.task_id) image = self.image_repo.get(self.image_id) if image.status == 'importing': if not image.locations[0]['url'].startswith("http"): diff --git a/glance/tests/functional/__init__.py b/glance/tests/functional/__init__.py index d2da994e41..530ca8de93 100644 --- a/glance/tests/functional/__init__.py +++ b/glance/tests/functional/__init__.py @@ -1455,6 +1455,7 @@ class SynchronousAPIBase(test_utils.BaseTestCase): group='glance_store') node_staging_uri = 'file://%s' % os.path.join( self.test_dir, 'staging') + utils.safe_mkdirs(node_staging_uri[7:]) self.config(node_staging_uri=node_staging_uri) self.config(default_store='file', group='glance_store') glance_store.create_stores(CONF) diff --git a/glance/tests/unit/async_/flows/test_location_import.py b/glance/tests/unit/async_/flows/test_location_import.py index c68f0c29ff..f54a6bae6d 100644 --- a/glance/tests/unit/async_/flows/test_location_import.py +++ b/glance/tests/unit/async_/flows/test_location_import.py @@ -24,6 +24,7 @@ from oslo_utils import units import glance.async_.flows.location_import as import_flow from glance.common import exception from glance import context +from glance import task_cancellation_tracker as task_tracker import glance.tests.unit.utils as unit_test_utils import glance.tests.utils as test_utils @@ -58,7 +59,10 @@ class TestCalculateHashTask(test_utils.BaseTestCase): project_id=TENANT1, overwrite=False) - def test_execute_calculate_hash(self): + @mock.patch.object(task_tracker, 'signal_finished') + @mock.patch.object(task_tracker, 'register_operation') + def test_execute_calculate_hash(self, mock_register_operation, + mock_signal_finished): self.loc_url = '%s/fake_location_1' % (BASE_URI) self.image.status = 'queued' hashing_algo = CONF.hashing_algorithm @@ -85,7 +89,54 @@ class TestCalculateHashTask(test_utils.BaseTestCase): self.assertIsNotNone(self.image.os_hash_value) self.assertEqual('active', self.image.status) - def test_hash_calculation_retry_count(self): + @mock.patch.object(task_tracker, 'register_operation') + def test_execute_cancel_hash_op(self, mock_register_operation): + self.loc_url = '%s/fake_location_1' % (BASE_URI) + self.image.status = 'queued' + hashing_algo = CONF.hashing_algorithm + + location_update = import_flow._UpdateLocationTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url, + self.context) + + location_update.execute() + self.assertEqual(1, self.image.locations.append.call_count) + + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + set_image_active.execute() + self.assertEqual('active', self.image.status) + + # Simulate cancellation + expected_size = 4 * units.Ki + expected_data = b"*" * expected_size + self.image.get_data.return_value = io.BytesIO(expected_data) + self.image.checksum = None + self.image.os_hash_value = None + hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE, + self.image_repo, + IMAGE_ID1, + hashing_algo) + + with mock.patch.object( + task_tracker, 'is_canceled') as mock_is_canceled: + mock_is_canceled.return_value = True + with mock.patch.object( + import_flow.LOG, 'debug') as mock_debug: + self.assertRaises(import_flow._HashCalculationCanceled, + hash_calculation.execute) + mock_debug.assert_any_call('Hash calculation cancelled: %s', + mock.ANY) + + mock_register_operation.assert_called() + self.assertIsNone(self.image.checksum) + self.assertIsNone(self.image.os_hash_value) + self.assertEqual('active', self.image.status) + + @mock.patch.object(task_tracker, 'signal_finished') + @mock.patch.object(task_tracker, 'register_operation') + def test_hash_calculation_retry_count(self, mock_register_operation, + mock_signal_finished): hashing_algo = CONF.hashing_algorithm self.image.checksum = None self.image.os_hash_value = None @@ -109,7 +160,10 @@ class TestCalculateHashTask(test_utils.BaseTestCase): hash_calculation.revert(None) self.assertIsNone(self.image.os_hash_algo) - def test_execute_hash_calculation_fails_without_validation_data(self): + @mock.patch.object(task_tracker, 'signal_finished') + @mock.patch.object(task_tracker, 'register_operation') + def test_execute_hash_calculation_fails_without_validation_data( + self, mock_register_operation, mock_signal_finished): self.loc_url = '%s/fake_location_1' % (BASE_URI) self.image.status = 'queued' self.hash_task_input.update(loc_url=self.loc_url) @@ -173,7 +227,10 @@ class TestCalculateHashTask(test_utils.BaseTestCase): self.assertEqual('active', self.image.status) self.assertEqual(1, len(self.image.locations)) - def test_execute_hash_calculation_fails_for_store_other_that_http(self): + @mock.patch.object(task_tracker, 'signal_finished') + @mock.patch.object(task_tracker, 'register_operation') + def test_execute_hash_calculation_fails_for_store_other_that_http( + self, mock_register_operation, mock_signal_finished): self.loc_url = "cinder://image/fake_location" self.hash_task_input.update(loc_url=self.loc_url) self.image.status = 'queued' @@ -215,7 +272,10 @@ class TestCalculateHashTask(test_utils.BaseTestCase): self.assertEqual('queued', self.image.status) self.assertEqual(0, len(self.image.locations)) - def test_execute_hash_calculation_fails_if_image_data_deleted(self): + @mock.patch.object(task_tracker, 'signal_finished') + @mock.patch.object(task_tracker, 'register_operation') + def test_execute_hash_calculation_fails_if_image_data_deleted( + self, mock_register_operation, mock_signal_finished): self.loc_url = '%s/fake_location_1' % (BASE_URI) self.image.status = 'queued' self.hash_task_input.update(loc_url=self.loc_url) @@ -241,10 +301,6 @@ class TestCalculateHashTask(test_utils.BaseTestCase): hashing_algo) self.image.get_data.side_effect = store.exceptions.NotFound hash_calculation.execute() - # Check if Image delete and image_repo.delete has been called - # if exception raised - self.image.delete.assert_called_once() - self.image_repo.remove.assert_called_once_with(self.image) class TestVerifyValidationDataTask(test_utils.BaseTestCase): @@ -262,7 +318,13 @@ class TestVerifyValidationDataTask(test_utils.BaseTestCase): self.image.container_format = 'bare' self.config(do_secure_hash=True) - def test_execute_with_valid_validation_data(self): + @mock.patch.object(task_tracker, 'is_canceled') + @mock.patch.object(task_tracker, 'signal_finished') + @mock.patch.object(task_tracker, 'register_operation') + def test_execute_with_valid_validation_data(self, mock_register_operation, + mock_signal_finished, + mock_is_canceled): + mock_is_canceled.return_value = False url = '%s/fake_location_1' % BASE_URI self.image.status = 'queued' self.image.locations = {"url": url, "metadata": {"store": "foo"}} @@ -302,7 +364,13 @@ class TestVerifyValidationDataTask(test_utils.BaseTestCase): set_image_active.execute() self.assertEqual('active', self.image.status) - def test_execute_with_os_hash_value_other_than_512(self): + @mock.patch.object(task_tracker, 'is_canceled') + @mock.patch.object(task_tracker, 'signal_finished') + @mock.patch.object(task_tracker, 'register_operation') + def test_execute_with_os_hash_value_other_than_512( + self, mock_register_operation, mock_signal_finished, + mock_is_canceled): + mock_is_canceled.return_value = False url = '%s/fake_location_1' % BASE_URI self.image.status = 'queued' self.image.locations = {"url": url, "metadata": {"store": "foo"}} @@ -340,7 +408,10 @@ class TestVerifyValidationDataTask(test_utils.BaseTestCase): set_image_active.execute() self.assertEqual('active', self.image.status) - def test_execute_with_invalid_validation_data(self): + @mock.patch.object(task_tracker, 'signal_finished') + @mock.patch.object(task_tracker, 'register_operation') + def test_execute_with_invalid_validation_data( + self, mock_register_operation, mock_signal_finished): url = '%s/fake_location_1' % BASE_URI self.image.status = 'queued' self.image.locations = [{"url": url, "metadata": {"store": "foo"}}] diff --git a/glance/tests/unit/v2/test_images_resource.py b/glance/tests/unit/v2/test_images_resource.py index f971289047..2336655764 100644 --- a/glance/tests/unit/v2/test_images_resource.py +++ b/glance/tests/unit/v2/test_images_resource.py @@ -38,6 +38,7 @@ from glance.common import store_utils from glance import domain import glance.notifier import glance.schema +from glance import task_cancellation_tracker as tracker from glance.tests.unit import base from glance.tests.unit.keymgr import fake as fake_keymgr import glance.tests.unit.utils as unit_test_utils @@ -3168,6 +3169,58 @@ class TestImagesController(base.IsolatedUnitTest): self.assertEqual('deleted', deleted_img['status']) self.assertNotIn('%s/%s' % (BASE_URI, UUID1), self.store.data) + def test_delete_cancels_operation(self): + request = unit_test_utils.get_fake_request() + # Set up an image with an import task operation id + operation_id = 'fake-operation-id' + hash_op_host = 'https://glance-worker1.openstack.org' + extra_props = { + 'os_glance_import_task': operation_id, + 'os_glance_hash_op_host': hash_op_host, + } + self.db.image_update(None, UUID1, {'properties': extra_props}) + self.assertIn('%s/%s' % (BASE_URI, UUID1), self.store.data) + + with mock.patch.object( + self.controller, 'is_proxyable') as mock_is_proxyable: + mock_is_proxyable.return_value = False + with mock.patch.object(tracker, 'cancel_operation') as mock_cancel: + self.controller.delete(request, UUID1) + mock_cancel.assert_called_once_with(operation_id) + + deleted_img = self.db.image_get( + request.context, UUID1, force_show_deleted=True) + self.assertTrue(deleted_img['deleted']) + self.assertEqual('deleted', deleted_img['status']) + self.assertNotIn('%s/%s' % (BASE_URI, UUID1), self.store.data) + + @mock.patch('glance.context.get_ksa_client') + def test_image_delete_proxies_hash_op_host(self, mock_client): + # Make sure that we proxy to the remote side when we need to + self.config( + worker_self_reference_url='http://glance-worker2.openstack.org') + request = unit_test_utils.get_fake_request( + '/v2/images/%s' % UUID4, method='DELETE') + with mock.patch.object( + glance.notifier.ImageRepoProxy, 'get') as m_get: + m_get.return_value = FakeImage(status='uploading') + m_get.return_value.extra_properties['os_glance_hash_op_host'] = ( + 'https://glance-worker1.openstack.org') + remote_hdrs = {'x-openstack-request-id': 'remote-req'} + mock_resp = mock.MagicMock(location='/target', + status_code=202, + reason='Thanks', + headers=remote_hdrs) + mock_client.return_value.delete.return_value = mock_resp + self.controller.delete(request, UUID4) + + # Make sure we called the expected remote URL and passed + # the body. + mock_client.return_value.delete.assert_called_once_with( + ('https://glance-worker1.openstack.org' + '/v2/images/%s') % UUID4, + json=None, timeout=60) + def test_delete_not_allowed_by_policy(self): request = unit_test_utils.get_fake_request() with mock.patch.object(self.controller.policy, 'enforce') as mock_enf: