Move repo state to blobstore

Zuul has a content-addressable blobstore in ZooKeeper.  We use it for
storing secrets, because they can be quite large, and many of them are
the same for many (or every) queue item, so we rely on the
deduplication inherent in content-addressable storage to keep ZK
traffic and storage minimal.

The same (with a few tweaks) could be true for repo state objects as
well.  The repo state is a dictionary of connection -> project > refs.
Queue items with exactly the same set of involved projects should end
up with identical repo states (assuming they were started around the
same time and a branch has not since advanced).  But queue items may
not have exactly the same projects; they may share a common set, but
then one might have one extra project.  To promote reuse while
accommodating this, we will store per-project repo states in the blob
store.  The queue items will then store a list of blobstore keys.
When we need to deal with the repo state for a queue item, we will
load in all of the project-repo-states specified by that list of keys
and combine them.

Likewise, when we update a repo state with the results of a merger
operation, we will split the repo state when storing it in the blob
store.

Change-Id: I2b276a072b48b91dec66d2f8e601a2b5c9128429
This commit is contained in:
James E. Blair 2024-05-23 13:37:28 -07:00
parent 8c8f2ee511
commit 3102b75a48
9 changed files with 162 additions and 26 deletions

View File

@ -204,3 +204,9 @@ Version 27
:Prior Zuul version: 10.0.0
:Description: Refactor branch cache.
Affects schedulers and web.
Version 28
----------
:Prior Zuul version: 10.1.0
:Description: Store repo state in blobstore.
Affects schedulers and executor.

View File

@ -157,6 +157,43 @@ class TestGithubModelUpgrade(ZuulTestCase):
dict(name='project-test2', result='SUCCESS'),
], ordered=False)
@model_version(27)
@simple_layout('layouts/two-projects-integrated.yaml')
def test_model_27_28(self):
# This excercises the repo state blobstore upgrade
self.hold_merge_jobs_in_queue = True
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
jobs = list(self.merger_api.queued())
self.merger_api.release(jobs[0])
self.waitUntilSettled()
# Perform the upgrade between the first and second merge jobs
# to verify that we don't lose the data from the first merge
# job.
self.model_test_component_info.model_api = 28
jobs = list(self.merger_api.queued())
self.merger_api.release(jobs[0])
self.waitUntilSettled()
worker = list(self.executor_server.job_workers.values())[0]
gerrit_repo_state = worker.repo_state['gerrit']
self.assertTrue('org/common-config' in gerrit_repo_state)
self.assertTrue('org/project1' in gerrit_repo_state)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='integration', result='SUCCESS'),
], ordered=False)
class TestBranchCacheUpgrade(BaseTestCase):
def setUp(self):

View File

@ -7135,10 +7135,11 @@ class TestSecrets(ZuulTestCase):
with self.scheds.first.sched.createZKContext(None, self.log)\
as context:
bs = BlobStore(context)
self.assertEqual(len(bs), 1)
# 1 secret, 2 repo states (project1 and common-config)
self.assertEqual(len(bs), 3)
self.scheds.first.sched._runBlobStoreCleanup()
self.assertEqual(len(bs), 1)
self.assertEqual(len(bs), 3)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()

View File

@ -1198,6 +1198,7 @@ class TestWeb(BaseTestWeb):
'override_checkout': None,
'merge_repo_state_ref': None,
'extra_repo_state_ref': None,
'repo_state_keys': [],
'playbooks': [{
'connection': 'gerrit',
'project': 'common-config',
@ -1309,6 +1310,7 @@ class TestWeb(BaseTestWeb):
'projects': [],
'merge_repo_state_ref': None,
'extra_repo_state_ref': None,
'repo_state_keys': [],
'secret_vars': None,
'ssh_keys': [],
'timeout': None,

View File

@ -112,6 +112,8 @@ def construct_build_params(uuid, connections, job, item, pipeline,
params['branch'] = change.branch
else:
params['branch'] = None
params['repo_state_keys'] = item.current_build_set.repo_state_keys
# MODEL_API < 28
params['merge_repo_state_ref'] = \
item.current_build_set._merge_repo_state_path
params['extra_repo_state_ref'] = \

View File

@ -75,6 +75,7 @@ from zuul.model import (
FrozenJob,
Job,
MergeRepoState,
RepoState,
)
import zuul.model
from zuul.nodepool import Nodepool
@ -1287,21 +1288,31 @@ class AnsibleJob(object):
)
def loadRepoState(self):
merge_rs_path = self.arguments['merge_repo_state_ref']
with self.executor_server.zk_context as ctx:
merge_repo_state = merge_rs_path and MergeRepoState.fromZK(
ctx, merge_rs_path)
extra_rs_path = self.arguments['extra_repo_state_ref']
extra_repo_state = extra_rs_path and ExtraRepoState.fromZK(
ctx, extra_rs_path)
d = {}
# Combine the two
for rs in (merge_repo_state, extra_repo_state):
if not rs:
continue
for connection in rs.state.keys():
d.setdefault(connection, {}).update(
rs.state.get(connection, {}))
repo_state_keys = self.arguments.get('repo_state_keys')
if repo_state_keys:
repo_state = RepoState()
with self.executor_server.zk_context as ctx:
blobstore = BlobStore(ctx)
for link in repo_state_keys:
repo_state.load(blobstore, link)
d = repo_state.state
else:
# MODEL_API < 28
merge_rs_path = self.arguments['merge_repo_state_ref']
with self.executor_server.zk_context as ctx:
merge_repo_state = merge_rs_path and MergeRepoState.fromZK(
ctx, merge_rs_path)
extra_rs_path = self.arguments['extra_repo_state_ref']
extra_repo_state = extra_rs_path and ExtraRepoState.fromZK(
ctx, extra_rs_path)
d = {}
# Combine the two
for rs in (merge_repo_state, extra_repo_state):
if not rs:
continue
for connection in rs.state.keys():
d.setdefault(connection, {}).update(
rs.state.get(connection, {}))
# Ensure that we have an origin ref for every local branch.
# Some of these will be overwritten later as we merge changes,
# but for starters, we can use the current head of each

View File

@ -27,7 +27,7 @@ from zuul.lib.logutil import get_annotated_logger
from zuul.lib.tarjan import strongly_connected_components
import zuul.lib.tracing as tracing
from zuul.model import (
Change, PipelineState, PipelineChangeList, QueueItem,
Change, PipelineState, PipelineChangeList,
filter_severity, EnqueueEvent
)
from zuul.zk.change_cache import ChangeKey
@ -1492,7 +1492,7 @@ class PipelineManager(metaclass=ABCMeta):
files_state=build_set.PENDING)
return False
def scheduleGlobalRepoState(self, item: QueueItem) -> bool:
def scheduleGlobalRepoState(self, item):
log = item.annotateLogger(self.log)
tenant = item.pipeline.tenant
@ -1536,7 +1536,9 @@ class PipelineManager(metaclass=ABCMeta):
build_set = item.current_build_set
# If we skipped the initial repo state (for branch/ref items),
# we need to include the merger items for the final repo state.
if build_set._merge_repo_state_path is None:
# MODEL_API < 28
if (build_set._merge_repo_state_path is None and
not build_set.repo_state_keys):
new_items.extend(build_set.merger_items)
for project in projects:

View File

@ -50,6 +50,7 @@ from zuul.lib import tracing
from zuul.zk import zkobject
from zuul.zk.blob_store import BlobStore
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
from zuul.exceptions import (
SEVERITY_ERROR,
SEVERITY_WARNING,
@ -925,7 +926,8 @@ class PipelineState(zkobject.ZKObject):
# Drop some attributes from local objects to save memory
build_set._set(_files=None,
_merge_repo_state=None,
_extra_repo_state=None)
_extra_repo_state=None,
_repo_state=RepoState())
job_graph = build_set.job_graph
if not job_graph:
continue
@ -4248,6 +4250,43 @@ class RepoFiles(zkobject.ShardedZKObject):
return json.dumps(data, sort_keys=True).encode("utf8")
class RepoState:
def __init__(self):
self.state = {}
self.state_keys = {}
def load(self, blobstore, key):
# Load a single project-repo-state from the blobstore and
# combine it with existing projects in this repo state.
if key in self.state_keys.values():
return
data = blobstore.get(key)
repo_state = json.loads(data.decode('utf-8'))
# Format is {connection: {project: state}}
for connection_name, connection_data in repo_state.items():
projects = self.state.setdefault(connection_name, {})
projects.update(connection_data)
for project_name, project_data in connection_data.items():
self.state_keys[(connection_name, project_name)] = key
def add(self, blobstore, repo_state):
# Split the incoming repo_state into individual
# project-repo-state objects in the blob store.
for connection_name, connection_data in repo_state.items():
projects = self.state.setdefault(connection_name, {})
for project_name, project_data in connection_data.items():
project_dict = {project_name: project_data}
connection_dict = {connection_name: project_dict}
serialized = json_dumps(
connection_dict, sort_keys=True).encode("utf8")
key = blobstore.put(serialized)
projects.update(project_dict)
self.state_keys[(connection_name, project_name)] = key
def getKeys(self):
return self.state_keys.values()
class BaseRepoState(zkobject.ShardedZKObject):
"""RepoState holds the repo state for a buildset
@ -4348,6 +4387,7 @@ class BuildSet(zkobject.ZKObject):
_merge_repo_state_path=None, # ZK path for above
_extra_repo_state=None, # Repo state for any additional projects
_extra_repo_state_path=None, # ZK path for above
repo_state_keys=[], # Refs (links) to blobstore repo_state
tries={},
files_state=self.NEW,
repo_state_state=self.NEW,
@ -4364,6 +4404,7 @@ class BuildSet(zkobject.ZKObject):
# Cached job graph of previous layout; not serialized
_old_job_graph=None,
_old_jobs={},
_repo_state=RepoState(),
)
def setFiles(self, items):
@ -4385,6 +4426,10 @@ class BuildSet(zkobject.ZKObject):
self._files_path = repo_files.getPath()
def getRepoState(self, context):
d = self._getRepoStateFromBlobstore(context)
if d:
return d
# MODEL_API < 28
if self._merge_repo_state_path and self._merge_repo_state is None:
try:
self._set(_merge_repo_state=MergeRepoState.fromZK(
@ -4407,6 +4452,12 @@ class BuildSet(zkobject.ZKObject):
d[connection].update(rs.state.get(connection, {}))
return d
def _getRepoStateFromBlobstore(self, context):
blobstore = BlobStore(context)
for link in self.repo_state_keys:
self._repo_state.load(blobstore, link)
return self._repo_state.state
def getFiles(self, context):
if self._files is not None:
return self._files
@ -4436,11 +4487,22 @@ class BuildSet(zkobject.ZKObject):
return bool(errs)
def setMergeRepoState(self, repo_state):
if self._merge_repo_state_path is not None:
raise Exception("Merge repo state can not be updated")
if not self._active_context:
raise Exception("setMergeRepoState must be used "
"with a context manager")
new = COMPONENT_REGISTRY.model_api >= 28
if (self._merge_repo_state_path is not None or
self._extra_repo_state_path is not None):
new = False
if new:
blobstore = BlobStore(self._active_context)
self._repo_state.add(blobstore, repo_state)
for key in self._repo_state.getKeys():
if key not in self.repo_state_keys:
self.repo_state_keys.append(key)
return
if self._merge_repo_state_path is not None:
raise Exception("Merge repo state can not be updated")
rs = MergeRepoState.new(self._active_context,
state=repo_state,
_buildset_path=self.getPath())
@ -4448,11 +4510,22 @@ class BuildSet(zkobject.ZKObject):
self._merge_repo_state_path = rs.getPath()
def setExtraRepoState(self, repo_state):
if self._extra_repo_state_path is not None:
raise Exception("Extra repo state can not be updated")
if not self._active_context:
raise Exception("setExtraRepoState must be used "
"with a context manager")
new = COMPONENT_REGISTRY.model_api >= 28
if (self._merge_repo_state_path is not None or
self._extra_repo_state_path is not None):
new = False
if new:
blobstore = BlobStore(self._active_context)
self._repo_state.add(blobstore, repo_state)
for key in self._repo_state.getKeys():
if key not in self.repo_state_keys:
self.repo_state_keys.append(key)
return
if self._extra_repo_state_path is not None:
raise Exception("Extra repo state can not be updated")
rs = ExtraRepoState.new(self._active_context,
state=repo_state,
_buildset_path=self.getPath())
@ -4491,6 +4564,7 @@ class BuildSet(zkobject.ZKObject):
"files": self._files_path,
"merge_repo_state": self._merge_repo_state_path,
"extra_repo_state": self._extra_repo_state_path,
"repo_state_keys": self.repo_state_keys,
"tries": self.tries,
"files_state": self.files_state,
"repo_state_state": self.repo_state_state,
@ -6044,6 +6118,7 @@ class QueueItem(zkobject.ZKObject):
for secret in pb['secrets'].values():
if isinstance(secret, dict) and 'blob' in secret:
keys.add(secret['blob'])
keys.update(self.current_build_set.repo_state_keys)
return keys
def getEventChange(self):

View File

@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
MODEL_API = 27
MODEL_API = 28