Add a blob store and store large secrets in it

To optimize for the case where the same large secrets are used in
every job, this adds a global blob store and offloads secrets into
it.

Because this adds extra ZK operations, it is only used for secrets
which are > 10kb.

The blob store is pruned every hour by looking for entries which
are not in use and also have not been referenced since the start
of the cleanup process.

Change-Id: I43f08bc525d7f7b98ba74727c534489339432e8e
This commit is contained in:
James E. Blair 2022-03-26 17:03:45 -07:00
parent 08348143f5
commit 5415c40989
11 changed files with 402 additions and 9 deletions

View File

@ -70,3 +70,12 @@ Version 6
:Prior Zuul version: 5.2.0
:Description: Stores the complete layout min_ltimes in /zuul/layout-data.
This only affects schedulers.
Version 7
---------
:Prior Zuul version: 5.2.2
:Description: Adds the blob store and stores large secrets in it.
Playbook secret references are now either an integer
index into the job secret list, or a dict with a blob
store key. This affects schedulers and executors.

View File

@ -149,6 +149,18 @@ This is a reference for object layout in Zookeeper.
These are sharded JSON blobs of the change data.
.. path:: zuul/cache/blob/data
Data for the blob store. These nodes are identified by a
sha256sum of the secret content.
These are sharded blobs of data.
.. path:: zuul/cache/blob/lock
Side-channel lock directory for the blob store. The store locks
by key id under this znode when writing.
.. path:: zuul/cleanup
This node holds locks for the cleanup routines to make sure that

View File

@ -177,6 +177,10 @@ class TestModelUpgrade(ZuulTestCase):
self.assertEqual(first.sched.local_layout_state.get("tenant-one"),
second.sched.local_layout_state.get("tenant-one"))
# No test for model version 7 (secrets in blob store): old and new
# code paths are exercised in existing tests since small secrets
# don't use the blob store.
class TestSemaphoreModelUpgrade(ZuulTestCase):
tenant_config_file = 'config/semaphore/main.yaml'

View File

@ -25,12 +25,14 @@ from time import sleep
from unittest import skip, skipIf
from zuul.lib import yamlutil
import fixtures
import git
import paramiko
import zuul.configloader
from zuul.lib import yamlutil as yaml
from zuul.model import MergeRequest
from zuul.zk.blob_store import BlobStore
from tests.base import (
AnsibleZuulTestCase,
@ -5641,6 +5643,39 @@ class TestSecrets(ZuulTestCase):
self._getSecrets('project2-complex', 'playbooks'),
[secret])
def test_blobstore_secret(self):
# Test the large secret blob store
self.executor_server.hold_jobs_in_build = True
self.useFixture(fixtures.MonkeyPatch(
'zuul.model.Job.SECRET_BLOB_SIZE',
1))
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
context = self.scheds.first.sched.createZKContext(None, self.log)
bs = BlobStore(context)
self.assertEqual(len(bs), 1)
self.scheds.first.sched._runBlobStoreCleanup()
self.assertEqual(len(bs), 1)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(A.reported, 1, "A should report success")
self.assertHistory([
dict(name='project1-secret', result='SUCCESS', changes='1,1'),
])
self.assertEqual(
[{'secret_name': self.secret}],
self._getSecrets('project1-secret', 'playbooks'))
self.scheds.first.sched._runBlobStoreCleanup()
self.assertEqual(len(bs), 0)
class TestSecretInheritance(ZuulTestCase):
tenant_config_file = 'config/secret-inheritance/main.yaml'

View File

@ -25,6 +25,7 @@ from zuul import model
from zuul.lib import yamlutil as yaml
from zuul.model import BuildRequest, HoldRequest, MergeRequest
from zuul.zk import ZooKeeperClient
from zuul.zk.blob_store import BlobStore
from zuul.zk.branch_cache import BranchCache
from zuul.zk.change_cache import (
AbstractChangeCache,
@ -1980,3 +1981,37 @@ class TestConfigurationErrorList(ZooKeeperBaseTestCase):
self.assertEqual(el1.errors[0], e1)
self.assertNotEqual(e1, e2)
self.assertEqual([e1, e2], [e1, e2])
class TestBlobStore(ZooKeeperBaseTestCase):
def test_blob_store(self):
stop_event = threading.Event()
self.zk_client.client.create('/zuul/pipeline', makepath=True)
# Create a new object
tenant_name = 'fake_tenant'
start_ltime = self.zk_client.getCurrentLtime()
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
bs = BlobStore(context)
with testtools.ExpectedException(KeyError):
bs.get('nope')
path = bs.put(b'something')
self.assertEqual(bs.get(path), b'something')
self.assertEqual([x for x in bs], [path])
self.assertEqual(len(bs), 1)
self.assertTrue(path in bs)
self.assertFalse('nope' in bs)
self.assertTrue(bs._checkKey(path))
self.assertFalse(bs._checkKey('nope'))
cur_ltime = self.zk_client.getCurrentLtime()
self.assertEqual(bs.getKeysLastUsedBefore(cur_ltime), {path})
self.assertEqual(bs.getKeysLastUsedBefore(start_ltime), set())
bs.delete(path, cur_ltime)
with testtools.ExpectedException(KeyError):
bs.get(path)

View File

@ -74,6 +74,7 @@ import zuul.model
from zuul.nodepool import Nodepool
from zuul.version import get_version_string
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.blob_store import BlobStore
from zuul.zk.components import ExecutorComponent, COMPONENT_REGISTRY
from zuul.zk.exceptions import JobRequestNotFound
from zuul.zk.executor import ExecutorApi
@ -2121,8 +2122,14 @@ class AnsibleJob(object):
"""
ret = {}
blobstore = BlobStore(self.executor_server.zk_context)
for secret_name, secret_index in secrets.items():
frozen_secret = self.job.secrets[secret_index]
if isinstance(secret_index, dict):
key = secret_index['blob']
data = blobstore.get(key)
frozen_secret = json.loads(data.decode('utf-8'))
else:
frozen_secret = self.job.secrets[secret_index]
secret = zuul.model.Secret(secret_name, None)
secret.secret_data = yaml.encrypted_load(
frozen_secret['encrypted_data'])

View File

@ -46,6 +46,7 @@ from zuul.lib.logutil import get_annotated_logger
from zuul.lib.capabilities import capabilities_registry
from zuul.lib.jsonutil import json_dumps
from zuul.zk import zkobject
from zuul.zk.blob_store import BlobStore
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
@ -2280,6 +2281,12 @@ class FrozenJob(zkobject.ZKObject):
_artifact_data=self._makeJobData(
context, 'artifact_data', artifact_data))
@property
def all_playbooks(self):
for k in ('pre_run', 'run', 'post_run', 'cleanup_run'):
playbooks = getattr(self, k)
yield from playbooks
class Job(ConfigObject):
"""A Job represents the defintion of actions to perform.
@ -2298,6 +2305,8 @@ class Job(ConfigObject):
empty_nodeset = NodeSet()
BASE_JOB_MARKER = object()
# Secrets larger than this size will be put in the blob store
SECRET_BLOB_SIZE = 10 * 1024
def isBase(self):
return self.parent is self.BASE_JOB_MARKER
@ -2490,17 +2499,35 @@ class Job(ConfigObject):
role['project'] = role_project.name
return d
def _deduplicateSecrets(self, secrets, playbook):
def _deduplicateSecrets(self, context, secrets, playbook):
# secrets is a list of secrets accumulated so far
# playbook is a frozen playbook from _freezePlaybook
# At the end of this method, the values in the playbook
# secrets dictionary will be mutated to either be an integer
# (which is an index into the job's secret list) or a dict
# (which contains a pointer to a key in the global blob
# store).
blobstore = BlobStore(context)
# Cast to list so we can modify in place
for secret_key, secret_value in list(playbook['secrets'].items()):
if secret_value in secrets:
playbook['secrets'][secret_key] = secrets.index(secret_value)
secret_serialized = json_dumps(
secret_value, sort_keys=True).encode("utf8")
if (COMPONENT_REGISTRY.model_api >= 6 and
len(secret_serialized) > self.SECRET_BLOB_SIZE):
# If the secret is large, store it in the blob store
# and store the key in the playbook secrets dict.
blob_key = blobstore.put(secret_serialized)
playbook['secrets'][secret_key] = {'blob': blob_key}
else:
secrets.append(secret_value)
playbook['secrets'][secret_key] = len(secrets) - 1
if secret_value in secrets:
playbook['secrets'][secret_key] =\
secrets.index(secret_value)
else:
secrets.append(secret_value)
playbook['secrets'][secret_key] = len(secrets) - 1
def freezeJob(self, context, tenant, layout, item,
redact_secrets_and_keys):
@ -2534,7 +2561,7 @@ class Job(ConfigObject):
# it's clear that the value ("REDACTED") is
# redacted.
for pb in v:
self._deduplicateSecrets(secrets, pb)
self._deduplicateSecrets(context, secrets, pb)
kw[k] = v
kw['secrets'] = secrets
kw['affected_projects'] = self._getAffectedProjects(tenant)
@ -5200,6 +5227,18 @@ class QueueItem(zkobject.ZKObject):
return True # This job's configuration has changed
return False
def getBlobKeys(self):
# Return a set of blob keys used by this item
# for each job in the frozen job graph
keys = set()
job_graph = self.current_build_set.job_graph
for job in job_graph.getJobs():
for pb in job.all_playbooks:
for secret in pb['secrets'].values():
if isinstance(secret, dict) and 'blob' in secret:
keys.add(secret['blob'])
return keys
class Bundle:
"""Identifies a collection of changes that must be treated as one unit."""

View File

@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# docs/developer/model-changelog.rst
MODEL_API = 6
MODEL_API = 7

View File

@ -74,6 +74,7 @@ from zuul.model import (
)
from zuul.version import get_version_string
from zuul.zk import ZooKeeperClient
from zuul.zk.blob_store import BlobStore
from zuul.zk.cleanup import (
SemaphoreCleanupLock,
BuildRequestCleanupLock,
@ -679,6 +680,7 @@ class Scheduler(threading.Thread):
self._runExecutorApiCleanup()
self._runMergerApiCleanup()
self._runLayoutDataCleanup()
self._runBlobStoreCleanup()
self.maintainConnectionCache()
except Exception:
self.log.exception("Error in general cleanup:")
@ -721,6 +723,40 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception("Error in layout data cleanup:")
def _runBlobStoreCleanup(self):
self.log.debug("Starting blob store cleanup")
try:
live_blobs = set()
with self.layout_lock:
# get the start ltime so that we can filter out any
# blobs used since this point
start_ltime = self.zk_client.getCurrentLtime()
# lock and refresh the pipeline
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
with pipeline_lock(
self.zk_client, tenant.name, pipeline.name,
) as lock:
ctx = self.createZKContext(lock, self.log)
pipeline.state.refresh(ctx)
# add any blobstore references
for item in pipeline.getAllItems(include_old=True):
live_blobs.update(item.getBlobKeys())
ctx = self.createZKContext(None, self.log)
blobstore = BlobStore(ctx)
# get the set of blob keys unused since the start time
# (ie, we have already filtered any newly added keys)
unused_blobs = blobstore.getKeysLastUsedBefore(start_ltime)
# remove the current refences
unused_blobs -= live_blobs
# delete what's left
for key in unused_blobs:
self.log.debug("Deleting unused blob: %s", key)
blobstore.delete(key, start_ltime)
self.log.debug("Finished blob store cleanup")
except Exception:
self.log.exception("Error in blob store cleanup:")
def _runBuildRequestCleanup(self):
# If someone else is running the cleanup, skip it.
if self.build_request_cleanup_lock.acquire(blocking=False):

201
zuul/zk/blob_store.py Normal file
View File

@ -0,0 +1,201 @@
# Copyright 2020 BMW Group
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
from kazoo.exceptions import NoNodeError
from kazoo.retry import KazooRetry
from zuul.zk.locks import locked, SessionAwareLock
from zuul.zk.zkobject import LocalZKContext, ZKContext
from zuul.zk import sharding
class BlobStore:
_retry_interval = 5
data_root = "/zuul/cache/blob/data"
lock_root = "/zuul/cache/blob/lock"
def __init__(self, context):
self.context = context
def _getRootPath(self, key):
return f"{self.data_root}/{key[0:2]}/{key}"
def _getPath(self, key):
root = self._getRootPath(key)
return f"{root}/data"
def _getFlagPath(self, key):
root = self._getRootPath(key)
return f"{root}/complete"
def _retry(self, context, func, *args, max_tries=-1, **kw):
kazoo_retry = KazooRetry(max_tries=max_tries,
interrupt=context.sessionIsInvalid,
delay=self._retry_interval, backoff=0,
ignore_expire=False)
try:
return kazoo_retry(func, *args, **kw)
except InterruptedError:
pass
@staticmethod
def _retryableLoad(context, key, path, flag):
if not context.client.exists(flag):
raise KeyError(key)
with sharding.BufferedShardReader(context.client, path) as stream:
data = stream.read()
compressed_size = stream.compressed_bytes_read
context.cumulative_read_time += stream.cumulative_read_time
context.cumulative_read_objects += 1
context.cumulative_read_znodes += stream.znodes_read
context.cumulative_read_bytes += compressed_size
return data, compressed_size
def get(self, key):
path = self._getPath(key)
flag = self._getFlagPath(key)
if self.context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
data, compressed_size = self._retry(self.context, self._retryableLoad,
self.context, key, path, flag)
return data
def _checkKey(self, key):
# This returns whether the key is in the store. If it is in
# the store, it also touches the flag file so that the cleanup
# routine can know the last time an entry was used.
flag = self._getFlagPath(key)
if self.context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
ret = self._retry(self.context, self.context.client.exists,
flag)
if not ret:
return False
self._retry(self.context, self.context.client.set,
flag, b'')
return True
@staticmethod
def _retryableSave(context, path, flag, data):
with sharding.BufferedShardWriter(context.client, path) as stream:
stream.truncate(0)
stream.write(data)
stream.flush()
context.client.ensure_path(flag)
compressed_size = stream.compressed_bytes_written
context.cumulative_write_time += stream.cumulative_write_time
context.cumulative_write_objects += 1
context.cumulative_write_znodes += stream.znodes_written
context.cumulative_write_bytes += compressed_size
return compressed_size
def put(self, data):
if isinstance(self.context, LocalZKContext):
return None
if self.context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
hasher = hashlib.sha256()
hasher.update(data)
key = hasher.hexdigest()
path = self._getPath(key)
flag = self._getFlagPath(key)
if self._checkKey(key):
return key
with locked(
SessionAwareLock(
self.context.client,
f"{self.lock_root}/{key}"),
blocking=True
) as lock:
if self._checkKey(key):
return key
# make a new context based on the old one
locked_context = ZKContext(self.context.client, lock,
self.context.stop_event,
self.context.log)
self._retry(
locked_context,
self._retryableSave,
locked_context, path, flag, data)
self.context.updateStatsFromOtherContext(locked_context)
return key
def delete(self, key, ltime):
path = self._getRootPath(key)
flag = self._getFlagPath(key)
if self.context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
try:
with locked(
SessionAwareLock(
self.context.client,
f"{self.lock_root}/{key}"),
blocking=True
) as lock:
# make a new context based on the old one
locked_context = ZKContext(self.context.client, lock,
self.context.stop_event,
self.context.log)
# Double check that it hasn't been used since we
# decided to delete it
data, zstat = self._retry(locked_context,
self.context.client.get,
flag)
if zstat.last_modified_transaction_id < ltime:
self._retry(locked_context, self.context.client.delete,
path, recursive=True)
except NoNodeError:
raise KeyError(key)
def __iter__(self):
try:
hashdirs = self.context.client.get_children(self.data_root)
except NoNodeError:
return
for hashdir in hashdirs:
try:
for key in self.context.client.get_children(
f'{self.data_root}/{hashdir}'):
yield key
except NoNodeError:
pass
def __len__(self):
return len([x for x in self])
def getKeysLastUsedBefore(self, ltime):
ret = set()
for key in self:
flag = self._getFlagPath(key)
data, zstat = self._retry(self.context, self.context.client.get,
flag)
if zstat.last_modified_transaction_id < ltime:
ret.add(key)
return ret

View File

@ -22,11 +22,16 @@ from kazoo.exceptions import NodeExistsError, NoNodeError
from kazoo.retry import KazooRetry
from zuul.zk import sharding
from zuul.zk import ZooKeeperClient
class ZKContext:
def __init__(self, zk_client, lock, stop_event, log):
self.client = zk_client.client
if isinstance(zk_client, ZooKeeperClient):
client = zk_client.client
else:
client = zk_client
self.client = client
self.lock = lock
self.stop_event = stop_event
self.log = log
@ -46,6 +51,16 @@ class ZKContext:
def sessionIsInvalid(self):
return not self.sessionIsValid()
def updateStatsFromOtherContext(self, other):
self.cumulative_read_time += other.cumulative_read_time
self.cumulative_write_time += other.cumulative_write_time
self.cumulative_read_objects += other.cumulative_read_objects
self.cumulative_write_objects += other.cumulative_write_objects
self.cumulative_read_znodes += other.cumulative_read_znodes
self.cumulative_write_znodes += other.cumulative_write_znodes
self.cumulative_read_bytes += other.cumulative_read_bytes
self.cumulative_write_bytes += other.cumulative_write_bytes
class LocalZKContext:
"""A Local ZKContext that means don't actually write anything to ZK"""