From 8f1f0705f79f9ac13c768aad5a604a51152b5b41 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 6 May 2020 09:24:02 -0700 Subject: [PATCH] Handle blob upload races If two uploads for the same digest race, we could have hit an error when finalizing the upload. The chunks are concatenated into the final file and then removed -- that removal would raise an exception if the chunk was already deleted. To avoid this, we add a lockfile to the digest to detect the race condition, and if we determine that we did not win the race, we delete our upload files and return success, assuming the other contestant won. This lockfile has a 300 second timeout, so if the other upload ended up failing, a later push can resolve the situation. Change-Id: I1ed7e132ede44a8594aa5539e9caf2fcef030ac4 --- zuul_registry/storage.py | 65 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/zuul_registry/storage.py b/zuul_registry/storage.py index 0e98b71..44ef28d 100644 --- a/zuul_registry/storage.py +++ b/zuul_registry/storage.py @@ -216,7 +216,67 @@ class Storage: self._update_upload(namespace, uuid, upload) return upload.size - size, upload.size + def _delete_upload(self, upload, namespace, uuid): + """Delete the files for an upload + + This is called when we have detected a race with another + upload for the same blob and have chosen to delete this upload + without finalizing. + """ + + for i, chunk in enumerate(upload.chunks): + src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1)) + self.backend.delete_object(src_path) + path = os.path.join(namespace, 'uploads', uuid, 'metadata') + self.backend.delete_object(path) + + def _lock_upload(self, namespace, uuid, digest): + """Lock the upload + + Place a metadata file in the blob directory so we can detect + whether we are racing another upload for the same blob. + """ + + # Check if the blob is locked + path = os.path.join(namespace, 'blobs', digest, 'metadata') + now = time.time() + current = self.backend.get_object(path) + waslocked = False + if current: + waslocked = True + current = json.loads(current.decode('utf8')) + locktime = int(current.get('time', 0)) + if now - locktime < 300: + # The lock is in force, another simultaneous upload + # must be handling this; assume it will succeed and go + # ahead and clean up this upload. + self.log.warning("Failed to obtain lock(1) on digest %s " + "for upload %s", digest, uuid) + return False + + # Lock the blob + metadata = dict(upload=uuid, + time=now) + self.backend.put_object(path, json.dumps(metadata).encode('utf8')) + current = self.backend.get_object(path) + current = json.loads(current.decode('utf8')) + locktime = int(current.get('time', 0)) + if (current.get('upload') != uuid and + now - locktime < 300): + # We lost a race for the lock, another simultaneous upload + # must be handling this; assume it will succeed and go + # ahead and clean up this upload. + self.log.warning("Failed to obtain lock(2) on digest %s " + "for upload %s", digest, uuid) + return False + + if waslocked: + self.log.warning("Breaking lock on digest %s " + "for upload %s", digest, uuid) + return True + def store_upload(self, namespace, uuid, digest): + """Complete an upload. Verify the supplied digest matches the uploaded data, and if @@ -228,6 +288,11 @@ class Storage: if digest != upload.digest: raise Exception('Digest does not match %s %s' % (digest, upload.digest)) + + if not self._lock_upload(namespace, uuid, digest): + self._delete_upload(upload, namespace, uuid) + return + # Move the chunks into the blob dir to get them out of the # uploads dir. chunks = []