Merge "Cleanup unused blob store lock paths"
This commit is contained in:
@ -26,6 +26,7 @@ import uuid
|
||||
import zlib
|
||||
from unittest import mock
|
||||
|
||||
import fixtures
|
||||
import testtools
|
||||
|
||||
from zuul import model
|
||||
@ -74,6 +75,7 @@ from zuul.zk.zkobject import (
|
||||
)
|
||||
from zuul.zk.locks import tenant_write_lock
|
||||
|
||||
import kazoo.recipe.lock
|
||||
from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError
|
||||
|
||||
|
||||
@ -2240,6 +2242,14 @@ class TestConfigurationErrorList(ZooKeeperBaseTestCase):
|
||||
|
||||
|
||||
class TestBlobStore(ZooKeeperBaseTestCase):
|
||||
def _assertEmptyBlobStore(self, bs, path):
|
||||
with testtools.ExpectedException(KeyError):
|
||||
bs.get(path)
|
||||
|
||||
# Make sure all keys have been cleaned up
|
||||
keys = self.zk_client.client.get_children(bs.lock_root)
|
||||
self.assertEqual([], keys)
|
||||
|
||||
def test_blob_store(self):
|
||||
stop_event = threading.Event()
|
||||
self.zk_client.client.create('/zuul/pipeline', makepath=True)
|
||||
@ -2254,24 +2264,96 @@ class TestBlobStore(ZooKeeperBaseTestCase):
|
||||
with testtools.ExpectedException(KeyError):
|
||||
bs.get('nope')
|
||||
|
||||
path = bs.put(b'something')
|
||||
key = bs.put(b'something')
|
||||
|
||||
self.assertEqual(bs.get(path), b'something')
|
||||
self.assertEqual([x for x in bs], [path])
|
||||
self.assertEqual(bs.get(key), b'something')
|
||||
self.assertEqual([x for x in bs], [key])
|
||||
self.assertEqual(len(bs), 1)
|
||||
|
||||
self.assertTrue(path in bs)
|
||||
self.assertTrue(key in bs)
|
||||
self.assertFalse('nope' in bs)
|
||||
self.assertTrue(bs._checkKey(path))
|
||||
self.assertTrue(bs._checkKey(key))
|
||||
self.assertFalse(bs._checkKey('nope'))
|
||||
|
||||
cur_ltime = self.zk_client.getCurrentLtime()
|
||||
self.assertEqual(bs.getKeysLastUsedBefore(cur_ltime), {path})
|
||||
self.assertEqual(bs.getKeysLastUsedBefore(cur_ltime), {key})
|
||||
self.assertEqual(bs.getKeysLastUsedBefore(start_ltime), set())
|
||||
bs.delete(path, cur_ltime)
|
||||
# Test deletion
|
||||
bs.delete(key, cur_ltime)
|
||||
self._assertEmptyBlobStore(bs, key)
|
||||
|
||||
with testtools.ExpectedException(KeyError):
|
||||
bs.get(path)
|
||||
# Put the blob back and test cleanup
|
||||
key = bs.put(b'something')
|
||||
live_blobs = set()
|
||||
cur_ltime = self.zk_client.getCurrentLtime()
|
||||
bs.cleanup(cur_ltime, live_blobs)
|
||||
self._assertEmptyBlobStore(bs, key)
|
||||
|
||||
# Test leaked lock dir cleanup
|
||||
self.zk_client.client.create(bs._getLockPath(key))
|
||||
bs.lock_grace_period = 0
|
||||
cur_ltime = self.zk_client.getCurrentLtime()
|
||||
bs.cleanup(cur_ltime, live_blobs)
|
||||
self._assertEmptyBlobStore(bs, key)
|
||||
|
||||
def test_blob_store_lock_cleanup_race(self):
|
||||
# The blob store lock cleanup can have a race condition as follows:
|
||||
# [1] Put a blob
|
||||
# [1] Delete the blob but fail to delete the lock path for the blob
|
||||
# [2] Start to put the blob it a second time
|
||||
# [2] Ensure the lock path exists
|
||||
# [1] Delete the leaked lock path
|
||||
# [2] Fail to create the lock because the lock path does not exist
|
||||
# To address this, we will retry the lock if it fails with
|
||||
# NoNodeError. This test verifies that behavior.
|
||||
stop_event = threading.Event()
|
||||
self.zk_client.client.create('/zuul/pipeline', makepath=True)
|
||||
# Create a new object
|
||||
tenant_name = 'fake_tenant'
|
||||
|
||||
created_event = threading.Event()
|
||||
deleted_event = threading.Event()
|
||||
orig_ensure_path = kazoo.recipe.lock.Lock._ensure_path
|
||||
|
||||
def _create_blob(bs):
|
||||
bs.put(b'something')
|
||||
|
||||
def _ensure_path(*args, **kw):
|
||||
orig_ensure_path(*args, **kw)
|
||||
created_event.set()
|
||||
deleted_event.wait()
|
||||
|
||||
with (tenant_write_lock(self.zk_client, tenant_name) as lock,
|
||||
ZKContext(
|
||||
self.zk_client, lock, stop_event, self.log) as context):
|
||||
bs = BlobStore(context)
|
||||
|
||||
# Get the key
|
||||
key = bs.put(b'something')
|
||||
cur_ltime = self.zk_client.getCurrentLtime()
|
||||
bs.delete(key, cur_ltime)
|
||||
|
||||
# Recreate the lock dir
|
||||
self.zk_client.client.create(bs._getLockPath(key))
|
||||
# Block the lock method so we can delete from under it
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'kazoo.recipe.lock.Lock._ensure_path',
|
||||
_ensure_path))
|
||||
# Start recreating the blob
|
||||
thread = threading.Thread(target=_create_blob, args=(bs,))
|
||||
thread.start()
|
||||
created_event.wait()
|
||||
# Run the cleanup
|
||||
live_blobs = set()
|
||||
bs.lock_grace_period = 0
|
||||
cur_ltime = self.zk_client.getCurrentLtime()
|
||||
bs.cleanup(cur_ltime, live_blobs)
|
||||
self._assertEmptyBlobStore(bs, key)
|
||||
# Finish recreating the blob
|
||||
deleted_event.set()
|
||||
thread.join()
|
||||
# Ensure the blob exists
|
||||
self.assertEqual(bs.get(key), b'something')
|
||||
|
||||
|
||||
class TestPipelineInit(ZooKeeperBaseTestCase):
|
||||
|
73
tools/cleanup-blobstore.py
Normal file
73
tools/cleanup-blobstore.py
Normal file
@ -0,0 +1,73 @@
|
||||
# Copyright 2024 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Run this to clean up leaked lock entries in the blob store (but only
|
||||
if there are too many for Zuul to deal with on its own).
|
||||
|
||||
Run this command to get a list of potential keys:
|
||||
|
||||
./bin/zkSnapShotToolkit.sh /data/version-2/snapshot.XXX \
|
||||
| grep /zuul/cache/blob/lock
|
||||
|
||||
Pass the result as input to this script.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
import kazoo.client
|
||||
from kazoo.exceptions import NotEmptyError
|
||||
|
||||
|
||||
class Cleanup:
|
||||
def __init__(self, args):
|
||||
kwargs = {}
|
||||
if args.cert:
|
||||
kwargs['use_ssl'] = True
|
||||
kwargs['keyfile'] = args.key
|
||||
kwargs['certfile'] = args.cert
|
||||
kwargs['ca'] = args.ca
|
||||
self.client = kazoo.client.KazooClient(args.host, **kwargs)
|
||||
self.client.start()
|
||||
|
||||
def run(self):
|
||||
prefix = '/zuul/cache/blob/lock/'
|
||||
|
||||
for line in sys.stdin:
|
||||
line = line.strip()
|
||||
if not line.startswith(prefix):
|
||||
continue
|
||||
key = line[len(prefix):]
|
||||
|
||||
blob_path = f"/zuul/cache/blob/data/{key[0:2]}/{key}"
|
||||
lock_path = f"/zuul/cache/blob/lock/{key}"
|
||||
if self.client.exists(blob_path):
|
||||
continue
|
||||
try:
|
||||
self.client.delete(lock_path)
|
||||
except NotEmptyError:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('host', help='ZK host string')
|
||||
parser.add_argument('--cert', help='Path to TLS certificate')
|
||||
parser.add_argument('--key', help='Path to TLS key')
|
||||
parser.add_argument('--ca', help='Path to TLS CA cert')
|
||||
args = parser.parse_args()
|
||||
|
||||
clean = Cleanup(args)
|
||||
clean.run()
|
@ -849,15 +849,7 @@ class Scheduler(threading.Thread):
|
||||
live_blobs.update(item.getBlobKeys())
|
||||
with self.createZKContext(None, self.log) as ctx:
|
||||
blobstore = BlobStore(ctx)
|
||||
# get the set of blob keys unused since the start time
|
||||
# (ie, we have already filtered any newly added keys)
|
||||
unused_blobs = blobstore.getKeysLastUsedBefore(start_ltime)
|
||||
# remove the current refences
|
||||
unused_blobs -= live_blobs
|
||||
# delete what's left
|
||||
for key in unused_blobs:
|
||||
self.log.debug("Deleting unused blob: %s", key)
|
||||
blobstore.delete(key, start_ltime)
|
||||
blobstore.cleanup(start_ltime, live_blobs)
|
||||
self.log.debug("Finished blob store cleanup")
|
||||
except Exception:
|
||||
self.log.exception("Error in blob store cleanup:")
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Copyright 2020 BMW Group
|
||||
# Copyright 2022 Acme Gating, LLC
|
||||
# Copyright 2022, 2024 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -14,9 +14,10 @@
|
||||
# under the License.
|
||||
|
||||
import hashlib
|
||||
import time
|
||||
import zlib
|
||||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
from kazoo.exceptions import NoNodeError, NotEmptyError
|
||||
from kazoo.retry import KazooRetry
|
||||
|
||||
from zuul.zk.locks import locked, SessionAwareLock
|
||||
@ -26,6 +27,7 @@ from zuul.zk import sharding
|
||||
|
||||
class BlobStore:
|
||||
_retry_interval = 5
|
||||
lock_grace_period = 300
|
||||
data_root = "/zuul/cache/blob/data"
|
||||
lock_root = "/zuul/cache/blob/lock"
|
||||
|
||||
@ -43,6 +45,9 @@ class BlobStore:
|
||||
root = self._getRootPath(key)
|
||||
return f"{root}/complete"
|
||||
|
||||
def _getLockPath(self, key):
|
||||
return f"{self.lock_root}/{key}"
|
||||
|
||||
def _retry(self, context, func, *args, max_tries=-1, **kw):
|
||||
kazoo_retry = KazooRetry(max_tries=max_tries,
|
||||
interrupt=context.sessionIsInvalid,
|
||||
@ -81,7 +86,7 @@ class BlobStore:
|
||||
# the store, it also touches the flag file so that the cleanup
|
||||
# routine can know the last time an entry was used. Because
|
||||
# of the critical section between the get and delete calls in
|
||||
# the delete method, this much be called with the key lock.
|
||||
# the delete method, this must be called with the key lock.
|
||||
flag = self._getFlagPath(key)
|
||||
|
||||
if self.context.sessionIsInvalid():
|
||||
@ -121,11 +126,11 @@ class BlobStore:
|
||||
|
||||
path = self._getPath(key)
|
||||
flag = self._getFlagPath(key)
|
||||
|
||||
lock_path = self._getLockPath(key)
|
||||
with locked(
|
||||
SessionAwareLock(
|
||||
self.context.client,
|
||||
f"{self.lock_root}/{key}"),
|
||||
lock_path),
|
||||
blocking=True
|
||||
) as lock:
|
||||
if self._checkKey(key):
|
||||
@ -145,13 +150,15 @@ class BlobStore:
|
||||
def delete(self, key, ltime):
|
||||
path = self._getRootPath(key)
|
||||
flag = self._getFlagPath(key)
|
||||
lock_path = self._getLockPath(key)
|
||||
if self.context.sessionIsInvalid():
|
||||
raise Exception("ZooKeeper session or lock not valid")
|
||||
deleted = False
|
||||
try:
|
||||
with locked(
|
||||
SessionAwareLock(
|
||||
self.context.client,
|
||||
f"{self.lock_root}/{key}"),
|
||||
lock_path),
|
||||
blocking=True
|
||||
) as lock:
|
||||
# make a new context based on the old one
|
||||
@ -167,8 +174,17 @@ class BlobStore:
|
||||
if zstat.last_modified_transaction_id < ltime:
|
||||
self._retry(locked_context, self.context.client.delete,
|
||||
path, recursive=True)
|
||||
deleted = True
|
||||
except NoNodeError:
|
||||
raise KeyError(key)
|
||||
if deleted:
|
||||
try:
|
||||
lock_path = self._getLockPath(key)
|
||||
self._retry(self.context, self.context.client.delete,
|
||||
lock_path)
|
||||
except Exception:
|
||||
self.context.log.exception(
|
||||
"Error deleting lock path %s:", lock_path)
|
||||
|
||||
def __iter__(self):
|
||||
try:
|
||||
@ -196,3 +212,79 @@ class BlobStore:
|
||||
if zstat.last_modified_transaction_id < ltime:
|
||||
ret.add(key)
|
||||
return ret
|
||||
|
||||
def cleanupLockDirs(self, start_ltime, live_blobs):
|
||||
# This cleanup was not present in an earlier version of Zuul,
|
||||
# therefore lock directory entries could grow without bound.
|
||||
# If there are too many entries, we won't be able to list them
|
||||
# in order to delete them and the connection will be closed.
|
||||
# Before we proceed, make sure that isn't the case here.
|
||||
# The size of the packet will be:
|
||||
# (num_children * (hash_length + int_size))
|
||||
# (num_children * (64 + 4)) = num_children * 68
|
||||
max_children = sharding.NODE_BYTE_SIZE_LIMIT / 68
|
||||
zstat = self._retry(self.context, self.context.client.exists,
|
||||
self.lock_root)
|
||||
if not zstat:
|
||||
# Lock root does not exist
|
||||
return
|
||||
if zstat.children_count > max_children:
|
||||
self.context.log.error(
|
||||
"Unable to cleanup blob store lock directory "
|
||||
"due to too many lock znodes.")
|
||||
return
|
||||
|
||||
# We're not retrying here just in case we calculate wrong and
|
||||
# we get a ZK disconnection. We could change this in the
|
||||
# future when we're no longer worried about the number of
|
||||
# children.
|
||||
keys = self.context.client.get_children(self.lock_root)
|
||||
for key in keys:
|
||||
if key in live_blobs:
|
||||
# No need to check a live blob
|
||||
continue
|
||||
path = self._getLockPath(key)
|
||||
zstat = self._retry(self.context, self.context.client.exists,
|
||||
path)
|
||||
if not zstat:
|
||||
continue
|
||||
# Any lock dir that is not for a live blob is either:
|
||||
# 1) leaked
|
||||
# created time will be old, okay to delete
|
||||
# 2) is for a newly created blob
|
||||
# created time will be new, not okay to delete
|
||||
# 3) is for a previously used blob about to be re-created
|
||||
# created time will be old, not okay to delete
|
||||
# We can not detect case 3, but it is unlikely to happen,
|
||||
# and if it does, the locked context manager in the put
|
||||
# method will recreate the lock directory after we delete
|
||||
# it.
|
||||
now = time.time()
|
||||
if ((zstat.created > now - self.lock_grace_period) or
|
||||
(zstat.creation_transaction_id > start_ltime)):
|
||||
# The lock was recently created, we may have caught it
|
||||
# while it's being created.
|
||||
continue
|
||||
try:
|
||||
self.context.log.debug("Deleting unused key dir: %s", path)
|
||||
self.context.client.delete(path)
|
||||
except NotEmptyError:
|
||||
# It may still be in use
|
||||
pass
|
||||
except NoNodeError:
|
||||
pass
|
||||
except Exception:
|
||||
self.context.log.exception(
|
||||
"Error deleting lock path %s:", path)
|
||||
|
||||
def cleanup(self, start_ltime, live_blobs):
|
||||
# get the set of blob keys unused since the start time
|
||||
# (ie, we have already filtered any newly added keys)
|
||||
unused_blobs = self.getKeysLastUsedBefore(start_ltime)
|
||||
# remove the current refences
|
||||
unused_blobs -= live_blobs
|
||||
# delete what's left
|
||||
for key in unused_blobs:
|
||||
self.context.log.debug("Deleting unused blob: %s", key)
|
||||
self.delete(key, start_ltime)
|
||||
self.cleanupLockDirs(start_ltime, live_blobs)
|
||||
|
@ -109,8 +109,15 @@ class SessionAwareReadLock(SessionAwareMixin, ReadLock):
|
||||
|
||||
@contextmanager
|
||||
def locked(lock, blocking=True, timeout=None):
|
||||
if not lock.acquire(blocking=blocking, timeout=timeout):
|
||||
raise LockException(f"Failed to acquire lock {lock}")
|
||||
try:
|
||||
if not lock.acquire(blocking=blocking, timeout=timeout):
|
||||
raise LockException(f"Failed to acquire lock {lock}")
|
||||
except NoNodeError:
|
||||
# If we encounter a NoNodeError when locking, try one more
|
||||
# time in case we're racing the cleanup of a lock directory.
|
||||
lock.assured_path = False
|
||||
if not lock.acquire(blocking=blocking, timeout=timeout):
|
||||
raise LockException(f"Failed to acquire lock {lock}")
|
||||
try:
|
||||
yield lock
|
||||
finally:
|
||||
|
Reference in New Issue
Block a user