Store FrozenJob data in separate znodes

To handle jobs which may return large volumes of parent data, store
the parent_data, secret_parent_data, and artifact_data in separate
shardable znodes.

Change-Id: I890c48abc7abdb460a3ced5b80e384ad5fba9bd7
This commit is contained in:
James E. Blair 2021-10-19 15:25:16 -07:00
parent 6565e7ada6
commit 0a336aab8d
3 changed files with 256 additions and 35 deletions

31
tests/fixtures/layouts/vars.yaml vendored Normal file
View File

@ -0,0 +1,31 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- job:
name: base
parent: null
run: playbooks/base.yaml
- job:
name: check-job
run: playbooks/check.yaml
vars:
my_var: foo
extra-vars:
extra_var: bar
- project:
name: org/project
check:
jobs:
- check-job

View File

@ -6646,6 +6646,33 @@ For CI problems and help debugging, contact ci@example.org"""
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
@simple_layout('layouts/vars.yaml')
def test_jobdata(self):
# Test the use of JobData objects for job variables
self.executor_server.hold_jobs_in_build = True
self.useFixture(fixtures.MonkeyPatch(
'zuul.model.FrozenJob.MAX_DATA_LEN',
1))
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# Make sure we're really using JobData objects
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
item = tenant.layout.pipelines['check'].queues[0].queue[0]
job = item.getJobs()[0]
self.assertTrue(isinstance(job._variables, zuul.model.JobData))
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='check-job', result='SUCCESS', changes='1,1'),
], ordered=False)
class TestChangeQueues(ZuulTestCase):
tenant_config_file = 'config/change-queues/main.yaml'

View File

@ -1560,6 +1560,51 @@ class ZuulRole(Role):
return self
class JobData(zkobject.ShardedZKObject):
"""Data or variables for a job.
These can be arbitrarily large, so they are stored as sharded ZK objects.
A hash attribute can be stored on the job object itself to detect
whether the data need to be refreshed.
"""
# We can always recreate data if necessary, so go ahead and
# truncate when we update so we avoid corrupted data.
truncate_on_create = True
def __repr__(self):
return '<JobData>'
def getPath(self):
return self._path
@classmethod
def new(klass, context, **kw):
"""Create a new instance and save it in ZooKeeper"""
obj = klass()
kw['hash'] = JobData.getHash(kw['data'])
obj._set(**kw)
obj._save(context, create=True)
return obj
@staticmethod
def getHash(data):
hasher = hashlib.sha256()
# Use json_dumps to strip any ZuulMark entries
hasher.update(json_dumps(data).encode('utf8'))
return hasher.hexdigest()
def serialize(self):
data = {
"data": self.data,
"hash": self.hash,
"_path": self._path,
}
return json.dumps(data).encode("utf8")
class FrozenJob(zkobject.ZKObject):
"""A rendered job definition that will actually be run.
@ -1571,6 +1616,72 @@ class FrozenJob(zkobject.ZKObject):
variables which deal with the current state of the job in the
pipeline.
"""
# If data/variables are more than 10k, we offload them to another
# object, otherwise we store them on this one.
MAX_DATA_LEN = 10 * 1024
attributes = ('ansible_version',
'dependencies',
'inheritance_path',
'name',
'nodeset',
'override_branch',
'override_checkout',
'post_timeout',
'required_projects',
'semaphores',
'tags',
'timeout',
'voting',
'queued',
'hold_following_changes',
'waiting_status',
'pre_run',
'run',
'post_run',
'cleanup_run',
'attempts',
'success_message',
'failure_message',
'provides',
'requires',
'workspace_scheme',
)
job_data_attributes = ('artifact_data',
'extra_variables',
'group_variables',
'host_variables',
'secret_parent_data',
'variables',
'parent_data',
)
@classmethod
def new(klass, context, **kw):
obj = klass()
# Convert these to JobData after creation.
job_data_vars = {}
for k in klass.job_data_attributes:
v = kw.pop(k, None)
if v:
# If the value is long, we need to make this a JobData;
# otherwise we can use the dict as-is.
if len(json_dumps(v).encode('utf8')) > klass.MAX_DATA_LEN:
job_data_vars[k] = v
v = None
kw['_' + k] = v
obj._set(**kw)
obj._save(context, create=True)
# If we need to make any JobData entries, do that now.
update_kw = {}
for (k, v) in job_data_vars.items():
update_kw['_' + k] = obj._makeJobData(context, k, v)
if update_kw:
obj.updateAttributes(context, **update_kw)
return obj
def isBase(self):
return self.parent is None
@ -1584,9 +1695,6 @@ class FrozenJob(zkobject.ZKObject):
return self.jobPath(self.name, self.buildset.getPath())
def serialize(self):
# This needs a unique implementation; toDict only represents
# the surface level attributes, and the executor api
# serialization is not reusable.
data = {}
for k in self.attributes:
v = getattr(self, k)
@ -1606,6 +1714,16 @@ class FrozenJob(zkobject.ZKObject):
for (project_name, job_project) in v.items()}
data[k] = v
for k in self.job_data_attributes:
v = getattr(self, '_' + k)
if isinstance(v, JobData):
v = {'storage': 'offload', 'path': v.getPath(), 'hash': v.hash}
elif isinstance(v, dict):
v = {'storage': 'local', 'data': v}
else:
v = None
data[k] = v
# Use json_dumps to strip any ZuulMark entries
return json_dumps(data).encode("utf8")
@ -1642,6 +1760,26 @@ class FrozenJob(zkobject.ZKObject):
data['provides'] = frozenset(data['provides'])
data['requires'] = frozenset(data['requires'])
for job_data_key in self.job_data_attributes:
job_data = data.pop(job_data_key, None)
if job_data:
# This is a dict which tells us where the actual data is.
if job_data['storage'] == 'local':
# The data are stored locally in this dict
data['_' + job_data_key] = job_data['data']
elif job_data['storage'] == 'offload':
existing_job_data = getattr(self, job_data_key, None)
if (getattr(existing_job_data, 'hash', None) ==
job_data['hash']):
# Re-use the existing object since it's the same
data['_' + job_data_key] = existing_job_data
else:
# Load the object from ZK
data['_' + job_data_key] = JobData.fromZK(
context, job_data['path'])
else:
data['_' + job_data_key] = None
return data
def setWaitingStatus(self, status):
@ -1651,6 +1789,40 @@ class FrozenJob(zkobject.ZKObject):
self.buildset.item.pipeline.manager.current_context,
waiting_status=status)
def _getJobData(self, name):
val = getattr(self, name, None)
if isinstance(val, JobData):
return val.data
return val
@property
def parent_data(self):
return self._getJobData('_parent_data')
@property
def secret_parent_data(self):
return self._getJobData('_secret_parent_data')
@property
def artifact_data(self):
return self._getJobData('_artifact_data')
@property
def extra_variables(self):
return self._getJobData('_extra_variables')
@property
def group_variables(self):
return self._getJobData('_group_variables')
@property
def host_variables(self):
return self._getJobData('_host_variables')
@property
def variables(self):
return self._getJobData('_variables')
@property
def combined_variables(self):
"""
@ -1710,24 +1882,39 @@ class FrozenJob(zkobject.ZKObject):
artifact_data.append(a)
return parent_data, secret_parent_data, artifact_data
def _makeJobData(self, context, name, data):
# If the data is large, store it in another object
if len(json_dumps(data).encode('utf8')) > self.MAX_DATA_LEN:
return JobData.new(
context, _path=self.getPath() + '/' + name,
data=data)
# Otherwise we can store it as a local dict
return data
def setParentData(self, parent_data, secret_parent_data, artifact_data):
context = self.buildset.item.pipeline.manager.current_context
kw = {}
if self.parent_data != parent_data:
kw['parent_data'] = parent_data
kw['_parent_data'] = self._makeJobData(
context, 'parent_data', parent_data)
if self.secret_parent_data != secret_parent_data:
kw['secret_parent_data'] = secret_parent_data
kw['_secret_parent_data'] = self._makeJobData(
context, 'secret_parent_data', secret_parent_data)
if self.artifact_data != artifact_data:
kw['artifact_data'] = artifact_data
kw['_artifact_data'] = self._makeJobData(
context, 'artifact_data', artifact_data)
if kw:
self.updateAttributes(
self.buildset.item.pipeline.manager.current_context,
**kw)
def setArtifactData(self, artifact_data):
context = self.buildset.item.pipeline.manager.current_context
if self.artifact_data != artifact_data:
self.updateAttributes(
self.buildset.item.pipeline.manager.current_context,
artifact_data=artifact_data)
context,
_artifact_data=self._makeJobData(
context, 'artifact_data', artifact_data))
class Job(ConfigObject):
@ -1943,32 +2130,9 @@ class Job(ConfigObject):
redact_secrets_and_keys):
buildset = item.current_build_set
kw = {}
for k in ('ansible_version',
'artifact_data',
'dependencies',
'extra_variables',
'group_variables',
'host_variables',
'inheritance_path',
'name',
'nodeset',
'override_branch',
'override_checkout',
'post_timeout',
'required_projects',
'secret_parent_data',
'semaphores',
'tags',
'timeout',
'voting',
'queued',
'hold_following_changes',
'waiting_status',
'pre_run', 'run', 'post_run', 'cleanup_run',
'parent_data', 'variables', 'attempts',
'success_message', 'failure_message',
'provides', 'requires',
'workspace_scheme'):
attributes = (set(FrozenJob.attributes) |
set(FrozenJob.job_data_attributes))
for k in attributes:
# If this is a config object, it's frozen, so it's
# safe to shallow copy.
v = getattr(self, k)
@ -1988,7 +2152,6 @@ class Job(ConfigObject):
kw[k] = v
kw['affected_projects'] = self._getAffectedProjects(tenant)
kw['config_hash'] = self.getConfigHash(tenant)
kw['attributes'] = list(kw.keys())
# Don't add buildset to attributes since it's not serialized
kw['buildset'] = buildset
return FrozenJob.new(context, **kw)