diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index 7562d9239a..efdf50e3d8 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -70,3 +70,12 @@ Version 6 :Prior Zuul version: 5.2.0 :Description: Stores the complete layout min_ltimes in /zuul/layout-data. This only affects schedulers. + +Version 7 +--------- + +:Prior Zuul version: 5.2.2 +:Description: Adds the blob store and stores large secrets in it. + Playbook secret references are now either an integer + index into the job secret list, or a dict with a blob + store key. This affects schedulers and executors. diff --git a/doc/source/developer/zookeeper.rst b/doc/source/developer/zookeeper.rst index 3bd4c7d45a..c14047ad64 100644 --- a/doc/source/developer/zookeeper.rst +++ b/doc/source/developer/zookeeper.rst @@ -149,6 +149,18 @@ This is a reference for object layout in Zookeeper. These are sharded JSON blobs of the change data. +.. path:: zuul/cache/blob/data + + Data for the blob store. These nodes are identified by a + sha256sum of the secret content. + + These are sharded blobs of data. + +.. path:: zuul/cache/blob/lock + + Side-channel lock directory for the blob store. The store locks + by key id under this znode when writing. + .. path:: zuul/cleanup This node holds locks for the cleanup routines to make sure that diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index 83fdb692e4..536c4f91d3 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -177,6 +177,10 @@ class TestModelUpgrade(ZuulTestCase): self.assertEqual(first.sched.local_layout_state.get("tenant-one"), second.sched.local_layout_state.get("tenant-one")) + # No test for model version 7 (secrets in blob store): old and new + # code paths are exercised in existing tests since small secrets + # don't use the blob store. + class TestSemaphoreModelUpgrade(ZuulTestCase): tenant_config_file = 'config/semaphore/main.yaml' diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index 2dbae108b3..279c1d1cad 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -25,12 +25,14 @@ from time import sleep from unittest import skip, skipIf from zuul.lib import yamlutil +import fixtures import git import paramiko import zuul.configloader from zuul.lib import yamlutil as yaml from zuul.model import MergeRequest +from zuul.zk.blob_store import BlobStore from tests.base import ( AnsibleZuulTestCase, @@ -5641,6 +5643,39 @@ class TestSecrets(ZuulTestCase): self._getSecrets('project2-complex', 'playbooks'), [secret]) + def test_blobstore_secret(self): + # Test the large secret blob store + self.executor_server.hold_jobs_in_build = True + self.useFixture(fixtures.MonkeyPatch( + 'zuul.model.Job.SECRET_BLOB_SIZE', + 1)) + + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + context = self.scheds.first.sched.createZKContext(None, self.log) + bs = BlobStore(context) + self.assertEqual(len(bs), 1) + + self.scheds.first.sched._runBlobStoreCleanup() + self.assertEqual(len(bs), 1) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertEqual(A.reported, 1, "A should report success") + self.assertHistory([ + dict(name='project1-secret', result='SUCCESS', changes='1,1'), + ]) + self.assertEqual( + [{'secret_name': self.secret}], + self._getSecrets('project1-secret', 'playbooks')) + + self.scheds.first.sched._runBlobStoreCleanup() + self.assertEqual(len(bs), 0) + class TestSecretInheritance(ZuulTestCase): tenant_config_file = 'config/secret-inheritance/main.yaml' diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index c8b8c74a87..e2893e9dfb 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -25,6 +25,7 @@ from zuul import model from zuul.lib import yamlutil as yaml from zuul.model import BuildRequest, HoldRequest, MergeRequest from zuul.zk import ZooKeeperClient +from zuul.zk.blob_store import BlobStore from zuul.zk.branch_cache import BranchCache from zuul.zk.change_cache import ( AbstractChangeCache, @@ -1980,3 +1981,37 @@ class TestConfigurationErrorList(ZooKeeperBaseTestCase): self.assertEqual(el1.errors[0], e1) self.assertNotEqual(e1, e2) self.assertEqual([e1, e2], [e1, e2]) + + +class TestBlobStore(ZooKeeperBaseTestCase): + def test_blob_store(self): + stop_event = threading.Event() + self.zk_client.client.create('/zuul/pipeline', makepath=True) + # Create a new object + tenant_name = 'fake_tenant' + + start_ltime = self.zk_client.getCurrentLtime() + with tenant_write_lock(self.zk_client, tenant_name) as lock: + context = ZKContext(self.zk_client, lock, stop_event, self.log) + bs = BlobStore(context) + with testtools.ExpectedException(KeyError): + bs.get('nope') + + path = bs.put(b'something') + + self.assertEqual(bs.get(path), b'something') + self.assertEqual([x for x in bs], [path]) + self.assertEqual(len(bs), 1) + + self.assertTrue(path in bs) + self.assertFalse('nope' in bs) + self.assertTrue(bs._checkKey(path)) + self.assertFalse(bs._checkKey('nope')) + + cur_ltime = self.zk_client.getCurrentLtime() + self.assertEqual(bs.getKeysLastUsedBefore(cur_ltime), {path}) + self.assertEqual(bs.getKeysLastUsedBefore(start_ltime), set()) + bs.delete(path, cur_ltime) + + with testtools.ExpectedException(KeyError): + bs.get(path) diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 158042be41..4a7e144bdb 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -74,6 +74,7 @@ import zuul.model from zuul.nodepool import Nodepool from zuul.version import get_version_string from zuul.zk.event_queues import PipelineResultEventQueue +from zuul.zk.blob_store import BlobStore from zuul.zk.components import ExecutorComponent, COMPONENT_REGISTRY from zuul.zk.exceptions import JobRequestNotFound from zuul.zk.executor import ExecutorApi @@ -2121,8 +2122,14 @@ class AnsibleJob(object): """ ret = {} + blobstore = BlobStore(self.executor_server.zk_context) for secret_name, secret_index in secrets.items(): - frozen_secret = self.job.secrets[secret_index] + if isinstance(secret_index, dict): + key = secret_index['blob'] + data = blobstore.get(key) + frozen_secret = json.loads(data.decode('utf-8')) + else: + frozen_secret = self.job.secrets[secret_index] secret = zuul.model.Secret(secret_name, None) secret.secret_data = yaml.encrypted_load( frozen_secret['encrypted_data']) diff --git a/zuul/model.py b/zuul/model.py index f5604f196d..0f632264f4 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -46,6 +46,7 @@ from zuul.lib.logutil import get_annotated_logger from zuul.lib.capabilities import capabilities_registry from zuul.lib.jsonutil import json_dumps from zuul.zk import zkobject +from zuul.zk.blob_store import BlobStore from zuul.zk.change_cache import ChangeKey from zuul.zk.components import COMPONENT_REGISTRY @@ -2280,6 +2281,12 @@ class FrozenJob(zkobject.ZKObject): _artifact_data=self._makeJobData( context, 'artifact_data', artifact_data)) + @property + def all_playbooks(self): + for k in ('pre_run', 'run', 'post_run', 'cleanup_run'): + playbooks = getattr(self, k) + yield from playbooks + class Job(ConfigObject): """A Job represents the defintion of actions to perform. @@ -2298,6 +2305,8 @@ class Job(ConfigObject): empty_nodeset = NodeSet() BASE_JOB_MARKER = object() + # Secrets larger than this size will be put in the blob store + SECRET_BLOB_SIZE = 10 * 1024 def isBase(self): return self.parent is self.BASE_JOB_MARKER @@ -2490,17 +2499,35 @@ class Job(ConfigObject): role['project'] = role_project.name return d - def _deduplicateSecrets(self, secrets, playbook): + def _deduplicateSecrets(self, context, secrets, playbook): # secrets is a list of secrets accumulated so far # playbook is a frozen playbook from _freezePlaybook + # At the end of this method, the values in the playbook + # secrets dictionary will be mutated to either be an integer + # (which is an index into the job's secret list) or a dict + # (which contains a pointer to a key in the global blob + # store). + + blobstore = BlobStore(context) + # Cast to list so we can modify in place for secret_key, secret_value in list(playbook['secrets'].items()): - if secret_value in secrets: - playbook['secrets'][secret_key] = secrets.index(secret_value) + secret_serialized = json_dumps( + secret_value, sort_keys=True).encode("utf8") + if (COMPONENT_REGISTRY.model_api >= 6 and + len(secret_serialized) > self.SECRET_BLOB_SIZE): + # If the secret is large, store it in the blob store + # and store the key in the playbook secrets dict. + blob_key = blobstore.put(secret_serialized) + playbook['secrets'][secret_key] = {'blob': blob_key} else: - secrets.append(secret_value) - playbook['secrets'][secret_key] = len(secrets) - 1 + if secret_value in secrets: + playbook['secrets'][secret_key] =\ + secrets.index(secret_value) + else: + secrets.append(secret_value) + playbook['secrets'][secret_key] = len(secrets) - 1 def freezeJob(self, context, tenant, layout, item, redact_secrets_and_keys): @@ -2534,7 +2561,7 @@ class Job(ConfigObject): # it's clear that the value ("REDACTED") is # redacted. for pb in v: - self._deduplicateSecrets(secrets, pb) + self._deduplicateSecrets(context, secrets, pb) kw[k] = v kw['secrets'] = secrets kw['affected_projects'] = self._getAffectedProjects(tenant) @@ -5200,6 +5227,18 @@ class QueueItem(zkobject.ZKObject): return True # This job's configuration has changed return False + def getBlobKeys(self): + # Return a set of blob keys used by this item + # for each job in the frozen job graph + keys = set() + job_graph = self.current_build_set.job_graph + for job in job_graph.getJobs(): + for pb in job.all_playbooks: + for secret in pb['secrets'].values(): + if isinstance(secret, dict) and 'blob' in secret: + keys.add(secret['blob']) + return keys + class Bundle: """Identifies a collection of changes that must be treated as one unit.""" diff --git a/zuul/model_api.py b/zuul/model_api.py index 93e34634a9..05286dad5f 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -14,4 +14,4 @@ # When making ZK schema changes, increment this and add a record to # docs/developer/model-changelog.rst -MODEL_API = 6 +MODEL_API = 7 diff --git a/zuul/scheduler.py b/zuul/scheduler.py index efc7711a28..ea87172225 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -74,6 +74,7 @@ from zuul.model import ( ) from zuul.version import get_version_string from zuul.zk import ZooKeeperClient +from zuul.zk.blob_store import BlobStore from zuul.zk.cleanup import ( SemaphoreCleanupLock, BuildRequestCleanupLock, @@ -679,6 +680,7 @@ class Scheduler(threading.Thread): self._runExecutorApiCleanup() self._runMergerApiCleanup() self._runLayoutDataCleanup() + self._runBlobStoreCleanup() self.maintainConnectionCache() except Exception: self.log.exception("Error in general cleanup:") @@ -721,6 +723,40 @@ class Scheduler(threading.Thread): except Exception: self.log.exception("Error in layout data cleanup:") + def _runBlobStoreCleanup(self): + self.log.debug("Starting blob store cleanup") + try: + live_blobs = set() + with self.layout_lock: + # get the start ltime so that we can filter out any + # blobs used since this point + start_ltime = self.zk_client.getCurrentLtime() + # lock and refresh the pipeline + for tenant in self.abide.tenants.values(): + for pipeline in tenant.layout.pipelines.values(): + with pipeline_lock( + self.zk_client, tenant.name, pipeline.name, + ) as lock: + ctx = self.createZKContext(lock, self.log) + pipeline.state.refresh(ctx) + # add any blobstore references + for item in pipeline.getAllItems(include_old=True): + live_blobs.update(item.getBlobKeys()) + ctx = self.createZKContext(None, self.log) + blobstore = BlobStore(ctx) + # get the set of blob keys unused since the start time + # (ie, we have already filtered any newly added keys) + unused_blobs = blobstore.getKeysLastUsedBefore(start_ltime) + # remove the current refences + unused_blobs -= live_blobs + # delete what's left + for key in unused_blobs: + self.log.debug("Deleting unused blob: %s", key) + blobstore.delete(key, start_ltime) + self.log.debug("Finished blob store cleanup") + except Exception: + self.log.exception("Error in blob store cleanup:") + def _runBuildRequestCleanup(self): # If someone else is running the cleanup, skip it. if self.build_request_cleanup_lock.acquire(blocking=False): diff --git a/zuul/zk/blob_store.py b/zuul/zk/blob_store.py new file mode 100644 index 0000000000..5547faff64 --- /dev/null +++ b/zuul/zk/blob_store.py @@ -0,0 +1,201 @@ +# Copyright 2020 BMW Group +# Copyright 2022 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import hashlib + +from kazoo.exceptions import NoNodeError +from kazoo.retry import KazooRetry + +from zuul.zk.locks import locked, SessionAwareLock +from zuul.zk.zkobject import LocalZKContext, ZKContext +from zuul.zk import sharding + + +class BlobStore: + _retry_interval = 5 + data_root = "/zuul/cache/blob/data" + lock_root = "/zuul/cache/blob/lock" + + def __init__(self, context): + self.context = context + + def _getRootPath(self, key): + return f"{self.data_root}/{key[0:2]}/{key}" + + def _getPath(self, key): + root = self._getRootPath(key) + return f"{root}/data" + + def _getFlagPath(self, key): + root = self._getRootPath(key) + return f"{root}/complete" + + def _retry(self, context, func, *args, max_tries=-1, **kw): + kazoo_retry = KazooRetry(max_tries=max_tries, + interrupt=context.sessionIsInvalid, + delay=self._retry_interval, backoff=0, + ignore_expire=False) + try: + return kazoo_retry(func, *args, **kw) + except InterruptedError: + pass + + @staticmethod + def _retryableLoad(context, key, path, flag): + if not context.client.exists(flag): + raise KeyError(key) + with sharding.BufferedShardReader(context.client, path) as stream: + data = stream.read() + compressed_size = stream.compressed_bytes_read + context.cumulative_read_time += stream.cumulative_read_time + context.cumulative_read_objects += 1 + context.cumulative_read_znodes += stream.znodes_read + context.cumulative_read_bytes += compressed_size + return data, compressed_size + + def get(self, key): + path = self._getPath(key) + flag = self._getFlagPath(key) + + if self.context.sessionIsInvalid(): + raise Exception("ZooKeeper session or lock not valid") + + data, compressed_size = self._retry(self.context, self._retryableLoad, + self.context, key, path, flag) + return data + + def _checkKey(self, key): + # This returns whether the key is in the store. If it is in + # the store, it also touches the flag file so that the cleanup + # routine can know the last time an entry was used. + flag = self._getFlagPath(key) + + if self.context.sessionIsInvalid(): + raise Exception("ZooKeeper session or lock not valid") + + ret = self._retry(self.context, self.context.client.exists, + flag) + if not ret: + return False + self._retry(self.context, self.context.client.set, + flag, b'') + return True + + @staticmethod + def _retryableSave(context, path, flag, data): + with sharding.BufferedShardWriter(context.client, path) as stream: + stream.truncate(0) + stream.write(data) + stream.flush() + context.client.ensure_path(flag) + compressed_size = stream.compressed_bytes_written + context.cumulative_write_time += stream.cumulative_write_time + context.cumulative_write_objects += 1 + context.cumulative_write_znodes += stream.znodes_written + context.cumulative_write_bytes += compressed_size + return compressed_size + + def put(self, data): + if isinstance(self.context, LocalZKContext): + return None + + if self.context.sessionIsInvalid(): + raise Exception("ZooKeeper session or lock not valid") + + hasher = hashlib.sha256() + hasher.update(data) + key = hasher.hexdigest() + + path = self._getPath(key) + flag = self._getFlagPath(key) + + if self._checkKey(key): + return key + + with locked( + SessionAwareLock( + self.context.client, + f"{self.lock_root}/{key}"), + blocking=True + ) as lock: + if self._checkKey(key): + return key + + # make a new context based on the old one + locked_context = ZKContext(self.context.client, lock, + self.context.stop_event, + self.context.log) + + self._retry( + locked_context, + self._retryableSave, + locked_context, path, flag, data) + self.context.updateStatsFromOtherContext(locked_context) + return key + + def delete(self, key, ltime): + path = self._getRootPath(key) + flag = self._getFlagPath(key) + if self.context.sessionIsInvalid(): + raise Exception("ZooKeeper session or lock not valid") + try: + with locked( + SessionAwareLock( + self.context.client, + f"{self.lock_root}/{key}"), + blocking=True + ) as lock: + # make a new context based on the old one + locked_context = ZKContext(self.context.client, lock, + self.context.stop_event, + self.context.log) + + # Double check that it hasn't been used since we + # decided to delete it + data, zstat = self._retry(locked_context, + self.context.client.get, + flag) + if zstat.last_modified_transaction_id < ltime: + self._retry(locked_context, self.context.client.delete, + path, recursive=True) + except NoNodeError: + raise KeyError(key) + + def __iter__(self): + try: + hashdirs = self.context.client.get_children(self.data_root) + except NoNodeError: + return + + for hashdir in hashdirs: + try: + for key in self.context.client.get_children( + f'{self.data_root}/{hashdir}'): + yield key + except NoNodeError: + pass + + def __len__(self): + return len([x for x in self]) + + def getKeysLastUsedBefore(self, ltime): + ret = set() + for key in self: + flag = self._getFlagPath(key) + data, zstat = self._retry(self.context, self.context.client.get, + flag) + if zstat.last_modified_transaction_id < ltime: + ret.add(key) + return ret diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 3905e254ce..aa32b8b9be 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -22,11 +22,16 @@ from kazoo.exceptions import NodeExistsError, NoNodeError from kazoo.retry import KazooRetry from zuul.zk import sharding +from zuul.zk import ZooKeeperClient class ZKContext: def __init__(self, zk_client, lock, stop_event, log): - self.client = zk_client.client + if isinstance(zk_client, ZooKeeperClient): + client = zk_client.client + else: + client = zk_client + self.client = client self.lock = lock self.stop_event = stop_event self.log = log @@ -46,6 +51,16 @@ class ZKContext: def sessionIsInvalid(self): return not self.sessionIsValid() + def updateStatsFromOtherContext(self, other): + self.cumulative_read_time += other.cumulative_read_time + self.cumulative_write_time += other.cumulative_write_time + self.cumulative_read_objects += other.cumulative_read_objects + self.cumulative_write_objects += other.cumulative_write_objects + self.cumulative_read_znodes += other.cumulative_read_znodes + self.cumulative_write_znodes += other.cumulative_write_znodes + self.cumulative_read_bytes += other.cumulative_read_bytes + self.cumulative_write_bytes += other.cumulative_write_bytes + class LocalZKContext: """A Local ZKContext that means don't actually write anything to ZK"""