Replace Eventlet with concurrent.futures
- Removed dependencies on Eventlet for asynchronous execution - Implemented thread pool executor using 'concurrent.futures' - Updated code to submit tasks and manage futures with futures.ThreadPoolExecutor - Ensured proper waiting for task completion with futures.wait() Change-Id: Iec6893666c25e03c6d1a6246d41759dc4bff3b24
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
|
||||
"""Storage backend for S3 or Storage Servers that follow the S3 Protocol"""
|
||||
|
||||
from concurrent import futures
|
||||
import io
|
||||
import logging
|
||||
import math
|
||||
@@ -32,7 +33,6 @@ except ImportError:
|
||||
boto_exceptions = None
|
||||
boto_utils = None
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import encodeutils
|
||||
from oslo_utils import units
|
||||
@@ -726,71 +726,75 @@ class Store(glance_store.driver.Store):
|
||||
os_hash_value = utils.get_hasher(hashing_algo, False)
|
||||
checksum = utils.get_hasher('md5', False)
|
||||
pool_size = self.s3_store_thread_pools
|
||||
pool = eventlet.greenpool.GreenPool(size=pool_size)
|
||||
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
|
||||
upload_id = mpu['UploadId']
|
||||
LOG.debug("Multipart initiate key=%(key)s, UploadId=%(UploadId)s",
|
||||
{'key': key, 'UploadId': upload_id})
|
||||
cstart = 0
|
||||
plist = []
|
||||
# Replace eventlet.GreenPool with ThreadPoolExecutor
|
||||
with futures.ThreadPoolExecutor(
|
||||
max_workers=pool_size) as executor:
|
||||
# Create a list to store the futures
|
||||
futures_list = []
|
||||
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
|
||||
upload_id = mpu['UploadId']
|
||||
LOG.debug("Multipart initiate key=%(key)s, UploadId=%(UploadId)s",
|
||||
{'key': key, 'UploadId': upload_id})
|
||||
cstart = 0
|
||||
plist = []
|
||||
|
||||
chunk_size = int(math.ceil(float(image_size) / MAX_PART_NUM))
|
||||
write_chunk_size = max(self.s3_store_large_object_chunk_size,
|
||||
chunk_size)
|
||||
it = utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE)
|
||||
buffered_chunk = b''
|
||||
while True:
|
||||
try:
|
||||
buffered_clen = len(buffered_chunk)
|
||||
if buffered_clen < write_chunk_size:
|
||||
# keep reading data
|
||||
read_chunk = next(it)
|
||||
buffered_chunk += read_chunk
|
||||
continue
|
||||
else:
|
||||
write_chunk = buffered_chunk[:write_chunk_size]
|
||||
remained_data = buffered_chunk[write_chunk_size:]
|
||||
os_hash_value.update(write_chunk)
|
||||
checksum.update(write_chunk)
|
||||
if verifier:
|
||||
verifier.update(write_chunk)
|
||||
fp = io.BytesIO(write_chunk)
|
||||
fp.seek(0)
|
||||
part = UploadPart(mpu, fp, cstart + 1, len(write_chunk))
|
||||
pool.spawn_n(run_upload, s3_client, bucket, key, part)
|
||||
plist.append(part)
|
||||
cstart += 1
|
||||
buffered_chunk = remained_data
|
||||
except StopIteration:
|
||||
if len(buffered_chunk) > 0:
|
||||
# Write the last chunk data
|
||||
write_chunk = buffered_chunk
|
||||
os_hash_value.update(write_chunk)
|
||||
checksum.update(write_chunk)
|
||||
if verifier:
|
||||
verifier.update(write_chunk)
|
||||
fp = io.BytesIO(write_chunk)
|
||||
fp.seek(0)
|
||||
part = UploadPart(mpu, fp, cstart + 1, len(write_chunk))
|
||||
pool.spawn_n(run_upload, s3_client, bucket, key, part)
|
||||
plist.append(part)
|
||||
break
|
||||
chunk_size = int(math.ceil(float(image_size) / MAX_PART_NUM))
|
||||
write_chunk_size = max(self.s3_store_large_object_chunk_size,
|
||||
chunk_size)
|
||||
it = utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE)
|
||||
buffered_chunk = b''
|
||||
while True:
|
||||
try:
|
||||
buffered_clen = len(buffered_chunk)
|
||||
if buffered_clen < write_chunk_size:
|
||||
# keep reading data
|
||||
read_chunk = next(it)
|
||||
buffered_chunk += read_chunk
|
||||
continue
|
||||
else:
|
||||
write_chunk = buffered_chunk[:write_chunk_size]
|
||||
remained_data = buffered_chunk[write_chunk_size:]
|
||||
os_hash_value.update(write_chunk)
|
||||
checksum.update(write_chunk)
|
||||
if verifier:
|
||||
verifier.update(write_chunk)
|
||||
fp = io.BytesIO(write_chunk)
|
||||
fp.seek(0)
|
||||
part = UploadPart(
|
||||
mpu, fp, cstart + 1, len(write_chunk))
|
||||
# Spawn thread to upload part
|
||||
futures_list.append(executor.submit(
|
||||
run_upload, s3_client, bucket, key, part))
|
||||
plist.append(part)
|
||||
cstart += 1
|
||||
buffered_chunk = remained_data
|
||||
except StopIteration:
|
||||
if len(buffered_chunk) > 0:
|
||||
# Write the last chunk data
|
||||
write_chunk = buffered_chunk
|
||||
os_hash_value.update(write_chunk)
|
||||
checksum.update(write_chunk)
|
||||
if verifier:
|
||||
verifier.update(write_chunk)
|
||||
fp = io.BytesIO(write_chunk)
|
||||
fp.seek(0)
|
||||
part = UploadPart(
|
||||
mpu, fp, cstart + 1, len(write_chunk))
|
||||
futures_list.append(executor.submit(
|
||||
run_upload, s3_client, bucket, key, part))
|
||||
plist.append(part)
|
||||
break
|
||||
|
||||
pedict = {}
|
||||
total_size = 0
|
||||
pool.waitall()
|
||||
# Wait for all uploads to finish
|
||||
futures.wait(futures_list)
|
||||
|
||||
for part in plist:
|
||||
pedict.update(part.etag)
|
||||
total_size += part.size
|
||||
|
||||
success = True
|
||||
for part in plist:
|
||||
if not part.success:
|
||||
success = False
|
||||
# Check success status
|
||||
success = all(p.success for p in plist)
|
||||
total_size = sum(p.size for p in plist)
|
||||
|
||||
if success:
|
||||
# Complete
|
||||
pedict = {p.partnum: p.etag[p.partnum] for p in plist}
|
||||
mpu_list = self._get_mpu_list(pedict)
|
||||
s3_client.complete_multipart_upload(Bucket=bucket,
|
||||
Key=key,
|
||||
|
||||
Reference in New Issue
Block a user