Handle blob upload races
If two uploads for the same digest race, we could have hit an error when finalizing the upload. The chunks are concatenated into the final file and then removed -- that removal would raise an exception if the chunk was already deleted. To avoid this, we add a lockfile to the digest to detect the race condition, and if we determine that we did not win the race, we delete our upload files and return success, assuming the other contestant won. This lockfile has a 300 second timeout, so if the other upload ended up failing, a later push can resolve the situation. Change-Id: I1ed7e132ede44a8594aa5539e9caf2fcef030ac4
This commit is contained in:
parent
7ba90a1cd4
commit
8f1f0705f7
|
@ -216,7 +216,67 @@ class Storage:
|
|||
self._update_upload(namespace, uuid, upload)
|
||||
return upload.size - size, upload.size
|
||||
|
||||
def _delete_upload(self, upload, namespace, uuid):
|
||||
"""Delete the files for an upload
|
||||
|
||||
This is called when we have detected a race with another
|
||||
upload for the same blob and have chosen to delete this upload
|
||||
without finalizing.
|
||||
"""
|
||||
|
||||
for i, chunk in enumerate(upload.chunks):
|
||||
src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1))
|
||||
self.backend.delete_object(src_path)
|
||||
path = os.path.join(namespace, 'uploads', uuid, 'metadata')
|
||||
self.backend.delete_object(path)
|
||||
|
||||
def _lock_upload(self, namespace, uuid, digest):
|
||||
"""Lock the upload
|
||||
|
||||
Place a metadata file in the blob directory so we can detect
|
||||
whether we are racing another upload for the same blob.
|
||||
"""
|
||||
|
||||
# Check if the blob is locked
|
||||
path = os.path.join(namespace, 'blobs', digest, 'metadata')
|
||||
now = time.time()
|
||||
current = self.backend.get_object(path)
|
||||
waslocked = False
|
||||
if current:
|
||||
waslocked = True
|
||||
current = json.loads(current.decode('utf8'))
|
||||
locktime = int(current.get('time', 0))
|
||||
if now - locktime < 300:
|
||||
# The lock is in force, another simultaneous upload
|
||||
# must be handling this; assume it will succeed and go
|
||||
# ahead and clean up this upload.
|
||||
self.log.warning("Failed to obtain lock(1) on digest %s "
|
||||
"for upload %s", digest, uuid)
|
||||
return False
|
||||
|
||||
# Lock the blob
|
||||
metadata = dict(upload=uuid,
|
||||
time=now)
|
||||
self.backend.put_object(path, json.dumps(metadata).encode('utf8'))
|
||||
current = self.backend.get_object(path)
|
||||
current = json.loads(current.decode('utf8'))
|
||||
locktime = int(current.get('time', 0))
|
||||
if (current.get('upload') != uuid and
|
||||
now - locktime < 300):
|
||||
# We lost a race for the lock, another simultaneous upload
|
||||
# must be handling this; assume it will succeed and go
|
||||
# ahead and clean up this upload.
|
||||
self.log.warning("Failed to obtain lock(2) on digest %s "
|
||||
"for upload %s", digest, uuid)
|
||||
return False
|
||||
|
||||
if waslocked:
|
||||
self.log.warning("Breaking lock on digest %s "
|
||||
"for upload %s", digest, uuid)
|
||||
return True
|
||||
|
||||
def store_upload(self, namespace, uuid, digest):
|
||||
|
||||
"""Complete an upload.
|
||||
|
||||
Verify the supplied digest matches the uploaded data, and if
|
||||
|
@ -228,6 +288,11 @@ class Storage:
|
|||
if digest != upload.digest:
|
||||
raise Exception('Digest does not match %s %s' %
|
||||
(digest, upload.digest))
|
||||
|
||||
if not self._lock_upload(namespace, uuid, digest):
|
||||
self._delete_upload(upload, namespace, uuid)
|
||||
return
|
||||
|
||||
# Move the chunks into the blob dir to get them out of the
|
||||
# uploads dir.
|
||||
chunks = []
|
||||
|
|
Loading…
Reference in New Issue