From 8ac98ddf6170f66d4bdefac159b633c713a75dd5 Mon Sep 17 00:00:00 2001 From: Abhishek Kekane Date: Tue, 13 May 2025 13:28:31 +0000 Subject: [PATCH] Replace Eventlet with concurrent.futures - Removed dependencies on Eventlet for asynchronous execution - Implemented thread pool executor using 'concurrent.futures' - Updated code to submit tasks and manage futures with futures.ThreadPoolExecutor - Ensured proper waiting for task completion with futures.wait() Change-Id: Iec6893666c25e03c6d1a6246d41759dc4bff3b24 --- glance_store/_drivers/s3.py | 124 +++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 60 deletions(-) diff --git a/glance_store/_drivers/s3.py b/glance_store/_drivers/s3.py index 95d4c84b..8388d078 100644 --- a/glance_store/_drivers/s3.py +++ b/glance_store/_drivers/s3.py @@ -15,6 +15,7 @@ """Storage backend for S3 or Storage Servers that follow the S3 Protocol""" +from concurrent import futures import io import logging import math @@ -32,7 +33,6 @@ except ImportError: boto_exceptions = None boto_utils = None -import eventlet from oslo_config import cfg from oslo_utils import encodeutils from oslo_utils import units @@ -726,71 +726,75 @@ class Store(glance_store.driver.Store): os_hash_value = utils.get_hasher(hashing_algo, False) checksum = utils.get_hasher('md5', False) pool_size = self.s3_store_thread_pools - pool = eventlet.greenpool.GreenPool(size=pool_size) - mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key) - upload_id = mpu['UploadId'] - LOG.debug("Multipart initiate key=%(key)s, UploadId=%(UploadId)s", - {'key': key, 'UploadId': upload_id}) - cstart = 0 - plist = [] + # Replace eventlet.GreenPool with ThreadPoolExecutor + with futures.ThreadPoolExecutor( + max_workers=pool_size) as executor: + # Create a list to store the futures + futures_list = [] + mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key) + upload_id = mpu['UploadId'] + LOG.debug("Multipart initiate key=%(key)s, UploadId=%(UploadId)s", + {'key': key, 'UploadId': upload_id}) + cstart = 0 + plist = [] - chunk_size = int(math.ceil(float(image_size) / MAX_PART_NUM)) - write_chunk_size = max(self.s3_store_large_object_chunk_size, - chunk_size) - it = utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE) - buffered_chunk = b'' - while True: - try: - buffered_clen = len(buffered_chunk) - if buffered_clen < write_chunk_size: - # keep reading data - read_chunk = next(it) - buffered_chunk += read_chunk - continue - else: - write_chunk = buffered_chunk[:write_chunk_size] - remained_data = buffered_chunk[write_chunk_size:] - os_hash_value.update(write_chunk) - checksum.update(write_chunk) - if verifier: - verifier.update(write_chunk) - fp = io.BytesIO(write_chunk) - fp.seek(0) - part = UploadPart(mpu, fp, cstart + 1, len(write_chunk)) - pool.spawn_n(run_upload, s3_client, bucket, key, part) - plist.append(part) - cstart += 1 - buffered_chunk = remained_data - except StopIteration: - if len(buffered_chunk) > 0: - # Write the last chunk data - write_chunk = buffered_chunk - os_hash_value.update(write_chunk) - checksum.update(write_chunk) - if verifier: - verifier.update(write_chunk) - fp = io.BytesIO(write_chunk) - fp.seek(0) - part = UploadPart(mpu, fp, cstart + 1, len(write_chunk)) - pool.spawn_n(run_upload, s3_client, bucket, key, part) - plist.append(part) - break + chunk_size = int(math.ceil(float(image_size) / MAX_PART_NUM)) + write_chunk_size = max(self.s3_store_large_object_chunk_size, + chunk_size) + it = utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE) + buffered_chunk = b'' + while True: + try: + buffered_clen = len(buffered_chunk) + if buffered_clen < write_chunk_size: + # keep reading data + read_chunk = next(it) + buffered_chunk += read_chunk + continue + else: + write_chunk = buffered_chunk[:write_chunk_size] + remained_data = buffered_chunk[write_chunk_size:] + os_hash_value.update(write_chunk) + checksum.update(write_chunk) + if verifier: + verifier.update(write_chunk) + fp = io.BytesIO(write_chunk) + fp.seek(0) + part = UploadPart( + mpu, fp, cstart + 1, len(write_chunk)) + # Spawn thread to upload part + futures_list.append(executor.submit( + run_upload, s3_client, bucket, key, part)) + plist.append(part) + cstart += 1 + buffered_chunk = remained_data + except StopIteration: + if len(buffered_chunk) > 0: + # Write the last chunk data + write_chunk = buffered_chunk + os_hash_value.update(write_chunk) + checksum.update(write_chunk) + if verifier: + verifier.update(write_chunk) + fp = io.BytesIO(write_chunk) + fp.seek(0) + part = UploadPart( + mpu, fp, cstart + 1, len(write_chunk)) + futures_list.append(executor.submit( + run_upload, s3_client, bucket, key, part)) + plist.append(part) + break - pedict = {} - total_size = 0 - pool.waitall() + # Wait for all uploads to finish + futures.wait(futures_list) - for part in plist: - pedict.update(part.etag) - total_size += part.size - - success = True - for part in plist: - if not part.success: - success = False + # Check success status + success = all(p.success for p in plist) + total_size = sum(p.size for p in plist) if success: # Complete + pedict = {p.partnum: p.etag[p.partnum] for p in plist} mpu_list = self._get_mpu_list(pedict) s3_client.complete_multipart_upload(Bucket=bucket, Key=key,