Load repo state from pipeline state on executors

Instead of sending the repo state via the build request we only provide
the path to the merge/extra repo state that we already as part of the
pipeline state. The executor will then load the repo states from the
given paths.

This way we reduce the amount of duplicate data we store in Zookeeper
which is especially important for the repo state, as it can become quite
large.

Change-Id: I7dc5d854dbee93af52d25b4462293c85eb7a1a8e
This commit is contained in:
Simon Westphahl 2021-11-17 11:28:24 +01:00
parent 4183d25f65
commit 384a8e82c9
3 changed files with 36 additions and 11 deletions

View File

@ -1058,7 +1058,8 @@ class TestWeb(BaseTestWeb):
'user_data': None}]},
'override_branch': None,
'override_checkout': None,
'repo_state': {},
'merge_repo_state_ref': None,
'extra_repo_state_ref': None,
'playbooks': [{
'connection': 'gerrit',
'project': 'common-config',
@ -1144,7 +1145,8 @@ class TestWeb(BaseTestWeb):
'override_checkout': None,
'post_timeout': None,
'projects': [],
'repo_state': {},
'merge_repo_state_ref': None,
'extra_repo_state_ref': None,
'secret_vars': None,
'ssh_keys': [],
'timeout': None,

View File

@ -94,7 +94,10 @@ def construct_build_params(uuid, connections, job, item, pipeline,
params['branch'] = None
params['override_branch'] = job.override_branch
params['override_checkout'] = job.override_checkout
params['repo_state'] = item.current_build_set.repo_state
merge_rs = item.current_build_set.merge_repo_state
params['merge_repo_state_ref'] = merge_rs and merge_rs.getPath()
extra_rs = item.current_build_set.extra_repo_state
params['extra_repo_state_ref'] = extra_rs and extra_rs.getPath()
params['ansible_version'] = job.ansible_version
params['workspace_scheme'] = job.workspace_scheme

View File

@ -62,6 +62,8 @@ from zuul.model import (
BuildRequest,
BuildStartedEvent,
BuildStatusEvent,
ExtraRepoState,
MergeRepoState,
NodeSet,
)
import zuul.model
@ -73,6 +75,7 @@ from zuul.zk.exceptions import JobRequestNotFound
from zuul.zk.executor import ExecutorApi
from zuul.zk.job_request_queue import JobRequestEvent
from zuul.zk.system import ZuulSystem
from zuul.zk.zkobject import ZKContext
BUFFER_LINES_FOR_SYNTAX = 200
COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose',
@ -1031,6 +1034,7 @@ class AnsibleJob(object):
)
self.setNodeInfo()
self.loadRepoState()
self.ssh_agent.start()
self.ssh_agent.add(self.private_key_file)
@ -1152,6 +1156,22 @@ class AnsibleJob(object):
"Unable to return nodeset %s", self.nodeset
)
def loadRepoState(self):
merge_rs_path = self.arguments['merge_repo_state_ref']
merge_repo_state = merge_rs_path and MergeRepoState.fromZK(
self.executor_server.zk_context, merge_rs_path)
extra_rs_path = self.arguments['extra_repo_state_ref']
extra_repo_state = extra_rs_path and ExtraRepoState.fromZK(
self.executor_server.zk_context, extra_rs_path)
d = {}
for rs in (merge_repo_state, extra_repo_state):
if not rs:
continue
for connection in rs.state.keys():
d.setdefault(connection, {}).update(
rs.state.get(connection, {}))
self.repo_state = d
def _base_job_data(self):
data = {
# TODO(mordred) worker_name is needed as a unique name for the
@ -1181,7 +1201,6 @@ class AnsibleJob(object):
self.log.debug("Job root: %s" % (self.jobdir.root,))
tasks = []
projects = set()
repo_state = args['repo_state']
with open(self.jobdir.job_output_file, 'a') as job_output:
job_output.write("{now} | Updating repositories\n".format(
@ -1192,7 +1211,7 @@ class AnsibleJob(object):
self.log.debug("Updating project %s" % (project,))
tasks.append(self.executor_server.update(
project['connection'], project['name'],
repo_state=repo_state,
repo_state=self.repo_state,
zuul_event_id=self.zuul_event_id,
build=self.build_request.uuid))
projects.add((project['connection'], project['name']))
@ -1211,7 +1230,7 @@ class AnsibleJob(object):
self.log.debug("Updating playbook or role %s" % (
repo['project'],))
tasks.append(self.executor_server.update(
*key, repo_state=repo_state,
*key, repo_state=self.repo_state,
zuul_event_id=self.zuul_event_id,
build=self.build_request.uuid))
projects.add(key)
@ -1235,7 +1254,7 @@ class AnsibleJob(object):
# Take refs and branches from repo state
project_repo_state = \
repo_state[task.connection_name][task.project_name]
self.repo_state[task.connection_name][task.project_name]
# All branch names
branches = [
ref[11:] # strip refs/heads/
@ -1287,7 +1306,7 @@ class AnsibleJob(object):
merge_items = [i for i in args['items'] if i.get('number')]
if merge_items:
item_commit = self.doMergeChanges(
merger, merge_items, repo_state, restored_repos)
merger, merge_items, self.repo_state, restored_repos)
if item_commit is None:
# There was a merge conflict and we have already sent
# a work complete result, don't run any jobs
@ -1302,7 +1321,7 @@ class AnsibleJob(object):
if (project['connection'], project['name']) in restored_repos:
continue
merger.setRepoState(
project['connection'], project['name'], repo_state,
project['connection'], project['name'], self.repo_state,
process_worker=self.executor_server.process_worker)
# Early abort if abort requested
@ -2045,7 +2064,7 @@ class AnsibleJob(object):
merger.checkoutBranch(
project.connection_name, project.name,
branch,
repo_state=args['repo_state'],
repo_state=self.repo_state,
process_worker=self.executor_server.process_worker,
zuul_event_id=self.zuul_event_id)
else:
@ -2096,7 +2115,7 @@ class AnsibleJob(object):
# If we don't have this repo yet prepared we need to restore
# the repo state. Otherwise we have speculative merges in the
# repo and must not restore the repo state again.
repo_state = args['repo_state']
repo_state = self.repo_state
self.log.debug("Cloning %s@%s into new untrusted space %s",
project, branch, root)
@ -3055,6 +3074,7 @@ class ExecutorServer(BaseMergeServer):
self.keystore = KeyStorage(
self.zk_client,
password=self._get_key_store_password())
self.zk_context = ZKContext(self.zk_client, None, None, self.log)
self._running = False
self._command_running = False
# TODOv3(mordred): make the executor name more unique --