Upload files concurrently to swift.

The zuul swift uploader was uploading all files serially. Speed things
along by performing file uploads with a thread pool. This should do a
better job of utilizing networking throughput as we will always be
uploading as many files as possible (up to the supplied max).

Co-Authored-By: Joshua Hesketh <josh@nitrotech.org>
Change-Id: I38806edb2db1b55a6b4d0cdf7794ec00c9fe9a35
This commit is contained in:
Clark Boylan 2015-02-17 13:56:25 -08:00
parent 1cef694a3f
commit ad257862eb

View File

@ -20,12 +20,16 @@ credentials provided by zuul
"""
import argparse
import logging
import magic
import os
import Queue
import requests
import requests.exceptions
import stat
import sys
import tempfile
import threading
import time
DEBUG = True
@ -134,8 +138,8 @@ def get_folder_metadata(file_path, number_files=0):
return metadata
def swift_form_post_submit(file_list, url, hmac_body, signature):
"""Send the files to swift via the FormPost middleware"""
def swift_form_post_queue(file_list, url, hmac_body, signature):
"""Queue up files for processing via requests to FormPost middleware"""
# We are uploading the file_list as an HTTP POST multipart encoded.
# First grab out the information we need to send back from the hmac_body
@ -147,33 +151,30 @@ def swift_form_post_submit(file_list, url, hmac_body, signature):
payload['max_file_count'],
payload['expires']) = hmac_body.split('\n')
payload['signature'] = signature
try:
payload['max_file_size'] = int(payload['max_file_size'])
payload['max_file_count'] = int(payload['max_file_count'])
payload['expires'] = int(payload['expires'])
except ValueError:
raise Exception("HMAC Body contains unexpected (non-integer) data.")
# Loop over the file list in chunks of max_file_count
for sub_file_list in (file_list[pos:pos + int(payload['max_file_count'])]
for pos in xrange(0, len(file_list),
int(payload['max_file_count']))):
if payload['expires'] < time.time():
raise Exception("Ran out of time uploading files!")
files = {}
# Zuul's log path is sometimes generated without a tailing slash. As
# such the object prefix does not contain a slash and the files would
# be uploaded as 'prefix' + 'filename'. Assume we want the destination
# url to look like a folder and make sure there's a slash between.
filename_prefix = '/' if url[-1] != '/' else ''
for i, f in enumerate(sub_file_list):
if os.path.getsize(f['path']) > int(payload['max_file_size']):
sys.stderr.write('Warning: %s exceeds %d bytes. Skipping...\n'
% (f['path'], int(payload['max_file_size'])))
continue
files['file%d' % (i + 1)] = (filename_prefix + f['relative_name'],
open(f['path'], 'rb'),
get_file_mime(f['path']))
if DEBUG:
print "About to POST the following files..."
print sub_file_list
requests.post(url, data=payload, files=files)
if DEBUG:
print "Finished upload.."
queue = Queue.Queue()
# Zuul's log path is sometimes generated without a tailing slash. As
# such the object prefix does not contain a slash and the files would
# be uploaded as 'prefix' + 'filename'. Assume we want the destination
# url to look like a folder and make sure there's a slash between.
filename_prefix = '/' if url[-1] != '/' else ''
for i, f in enumerate(file_list):
if os.path.getsize(f['path']) > payload['max_file_size']:
sys.stderr.write('Warning: %s exceeds %d bytes. Skipping...\n'
% (f['path'], payload['max_file_size']))
continue
fileinfo = {'file01': (filename_prefix + f['relative_name'],
f['path'],
get_file_mime(f['path']))}
filejob = (url, payload, fileinfo)
queue.put(filejob)
return queue
def build_file_list(file_path, logserver_prefix, swift_destination_prefix,
@ -298,6 +299,51 @@ def build_file_list(file_path, logserver_prefix, swift_destination_prefix,
return file_list
class PostThread(threading.Thread):
"""Thread object to upload files to swift via form post"""
def __init__(self, queue):
super(PostThread, self).__init__()
self.queue = queue
def _post_file(self, url, payload, fileinfo):
if payload['expires'] < time.time():
raise Exception("Ran out of time uploading files!")
files = {}
for key in fileinfo.keys():
files[key] = (fileinfo[key][0],
open(fileinfo[key][1], 'rb'),
fileinfo[key][2])
requests.post(url, data=payload, files=files)
def run(self):
while True:
try:
job = self.queue.get_nowait()
self._post_file(*job)
except requests.exceptions.RequestException:
# Do our best to attempt to upload all the files
logging.exception("File posting error")
continue
except IOError:
# Do our best to attempt to upload all the files
logging.exception("Error opening file")
continue
except Queue.Empty:
# No more work to do
return
def swift_form_post(queue, num_threads):
"""Spin up thread pool to upload to swift"""
threads = []
for x in range(num_threads):
t = PostThread(queue)
threads.append(t)
t.start()
for t in threads:
t.join()
def grab_args():
"""Grab and return arguments"""
parser = argparse.ArgumentParser(
@ -389,8 +435,14 @@ if __name__ == '__main__':
print "List of files prepared to upload:"
print file_list
swift_form_post_submit(file_list, swift_url, swift_hmac_body,
swift_signature)
queue = swift_form_post_queue(file_list, swift_url, swift_hmac_body,
swift_signature)
max_file_count = int(swift_hmac_body.split('\n')[3])
# Attempt to upload at least one item
items_to_upload = max(queue.qsize(), 1)
# Cap number of threads to a reasonable number
num_threads = min(max_file_count, items_to_upload)
swift_form_post(queue, num_threads)
print os.path.join(logserver_prefix, swift_destination_prefix,
os.path.basename(index_file))