Index job map by uuid

This is part of the circular dependency refactor.  It changes the
job map (a dictionary shared by the BuildSet and JobGraph classes
(BuildSet.jobs is JobGraph._job_map -- this is because JobGraph
is really just a class to encapsulate some logic for BuildSet))
to be indexed by FrozenJob.uuid instead of job name.  This helps
prepare for supporting multiple jobs with the same name in a
buildset.

Change-Id: Ie17dcf2dd0d086bd18bb3471592e32dcbb8b8bda
This commit is contained in:
James E. Blair
2023-12-05 15:44:51 -08:00
parent 071c48c5ae
commit cb3c4883f2
8 changed files with 151 additions and 60 deletions

View File

@@ -168,3 +168,9 @@ Version 21
:Prior Zuul version: 9.3.0
:Description: Add job_dependencies and job_dependents fields to job graphs.
Affects schedulers.
Version 22
----------
:Prior Zuul version: 9.3.0
:Description: Add model_version field to job graphs and index jobs by uuid.
Affects schedulers.

View File

@@ -354,6 +354,8 @@ Work Items
later, but again, will work for the n=1 case now. This can be
done as an early standalone change.
* Index the build dictionary on the BuildSet by job uuid.
* Update items to support multiple changes. This is likely to be a
large change where we simultaneously update anything where we can't
support both systems ahead of time.

View File

@@ -231,6 +231,7 @@ class TestJob(BaseTestCase):
change = model.Change(self.project)
change.branch = 'master'
change.cache_stat = Dummy(key=Dummy(reference=uuid.uuid4().hex))
item = self.queue.enqueueChange(change, None)
self.assertTrue(base.changeMatchesBranch(change))
@@ -247,6 +248,7 @@ class TestJob(BaseTestCase):
self.assertEqual(job.timeout, 70)
change.branch = 'stable/diablo'
change.cache_stat = Dummy(key=Dummy(reference=uuid.uuid4().hex))
item = self.queue.enqueueChange(change, None)
self.assertTrue(base.changeMatchesBranch(change))
@@ -296,6 +298,7 @@ class TestJob(BaseTestCase):
change = model.Change(self.project)
change.branch = 'master'
change.cache_stat = Dummy(key=Dummy(reference=uuid.uuid4().hex))
change.files = ['/COMMIT_MSG', 'ignored-file']
item = self.queue.enqueueChange(change, None)
@@ -371,6 +374,7 @@ class TestJob(BaseTestCase):
change = model.Change(self.project)
# Test master
change.branch = 'master'
change.cache_stat = Dummy(key=Dummy(reference=uuid.uuid4().hex))
item = self.queue.enqueueChange(change, None)
with testtools.ExpectedException(
Exception,
@@ -448,6 +452,7 @@ class TestJob(BaseTestCase):
change = model.Change(self.project)
change.branch = 'master'
change.cache_stat = Dummy(key=Dummy(reference=uuid.uuid4().hex))
item = self.queue.enqueueChange(change, None)
self.assertTrue(base.changeMatchesBranch(change))
@@ -482,6 +487,7 @@ class FakeFrozenJob(model.Job):
def __init__(self, name):
super().__init__(name)
self.uuid = uuid.uuid4().hex
self.ref = 'fake reference'
class TestGraph(BaseTestCase):
@@ -490,15 +496,6 @@ class TestGraph(BaseTestCase):
COMPONENT_REGISTRY.registry.model_api = MODEL_API
super().setUp()
def test_job_graph_disallows_multiple_jobs_with_same_name(self):
graph = model.JobGraph({})
job1 = FakeFrozenJob('job')
job2 = FakeFrozenJob('job')
graph.addJob(job1)
with testtools.ExpectedException(Exception,
"Job job already added"):
graph.addJob(job2)
def test_job_graph_disallows_circular_dependencies(self):
jobs = [FakeFrozenJob('job%d' % i) for i in range(0, 10)]

View File

@@ -719,15 +719,29 @@ class TestDataReturn(AnsibleZuulTestCase):
self.assertIn('- data-return-relative https://zuul.example.com',
A.messages[-1])
@model_version(20)
def test_model_20_21(self):
self._test_circ_dep_refactor()
# To 21
@model_version(18)
def test_model_18_21(self):
self._test_circ_dep_refactor()
self._test_circ_dep_refactor(21)
def _test_circ_dep_refactor(self):
@model_version(20)
def test_model_20_21(self):
self._test_circ_dep_refactor(21)
# To 22
@model_version(18)
def test_model_18_22(self):
self._test_circ_dep_refactor(22)
@model_version(20)
def test_model_20_22(self):
self._test_circ_dep_refactor(22)
@model_version(21)
def test_model_21_22(self):
self._test_circ_dep_refactor(22)
def _test_circ_dep_refactor(self, final_model_api):
# Test backwards compat for job graph dependency freezing.
# First test the entire lifecycle under the old api.
A = self.fake_gerrit.addFakeChange('org/project6', 'master', 'A')
@@ -751,10 +765,10 @@ class TestDataReturn(AnsibleZuulTestCase):
self.assertEqual(len(self.builds), 1)
# Upgrade our component
self.model_test_component_info.model_api = 21
self.model_test_component_info.model_api = final_model_api
component_registry = ComponentRegistry(self.zk_client)
for _ in iterate_timeout(30, "model api to update"):
if component_registry.model_api == 21:
if component_registry.model_api == final_model_api:
break
self.executor_server.hold_jobs_in_build = False

View File

@@ -1936,7 +1936,7 @@ class PipelineManager(metaclass=ABCMeta):
# job) is within the current buildset (if it has been
# deduplicated).
try:
this_uuid = item.current_build_set.job_graph.getUuidForJob(
this_uuid = item.current_build_set.job_graph.getUuidForJobName(
job.name)
except ValueError:
# This doesn't currently raise a ValueError, it just

View File

@@ -2390,7 +2390,8 @@ class FrozenJob(zkobject.ZKObject):
self._set(_ready_to_run=False)
def __repr__(self):
return '<FrozenJob %s>' % (self.name)
name = getattr(self, 'name', '<UNKNOWN>')
return f'<FrozenJob {name}>'
def isEqual(self, other):
# Compare two frozen jobs to determine whether they are
@@ -2529,6 +2530,12 @@ class FrozenJob(zkobject.ZKObject):
# MODEL_API < 19
data.setdefault("uuid", None)
# MODEL_API < 22
if 'ref' not in data and hasattr(self, 'buildset'):
# buildset is provided on the scheduler, but not on the
# executor; but we don't need the ref on the executor.
data['ref'] = self.buildset.item.change.cache_key
if hasattr(self, 'nodeset_alternatives'):
alts = self.nodeset_alternatives
else:
@@ -3064,6 +3071,7 @@ class Job(ConfigObject):
kw['dependencies'] = frozenset(kw['dependencies'])
kw['semaphores'] = list(kw['semaphores'])
kw['failure_output'] = list(kw['failure_output'])
kw['ref'] = item.change.cache_key
# Don't add buildset to attributes since it's not serialized
kw['buildset'] = buildset
return FrozenJob.new(context, **kw)
@@ -3642,7 +3650,7 @@ class JobGraph(object):
self.jobs = []
# An ordered list of job UUIDs
self.job_uuids = []
# dependent_job_name -> dict(parent_job_name -> soft)
# dependent_job_uuid -> dict(parent_job_name -> soft)
self._dependencies = {}
# The correct terminology is "dependencies" and "dependents",
# which is confusing, but it's not appropriate to use terms
@@ -3657,6 +3665,14 @@ class JobGraph(object):
# Dict of {job_uuid: {child_uuid: {soft: bool}}}
self.job_dependents = {}
self.project_metadata = {}
# A temporary model version to help with the circular dep refactor
self.model_version = 0
# Not serialized
# Store the model api at the time this object was instantiated
# so we don't change behavior while freezing.
self.initial_model_api = COMPONENT_REGISTRY.model_api
if self.initial_model_api >= 22:
self.model_version = 22
def __repr__(self):
return '<JobGraph %s>' % (self.jobs)
@@ -3671,12 +3687,14 @@ class JobGraph(object):
"project_metadata": {
k: v.toDict() for (k, v) in self.project_metadata.items()
},
"model_version": self.model_version,
}
return data
@classmethod
def fromDict(klass, data, job_map):
self = klass(job_map)
self.model_version = data.get('model_version', self.model_version)
self.jobs = data['jobs']
# MODEL_API < 19: if job uuids is not set, we default the
# UUID for all jobs to None.
@@ -3694,28 +3712,69 @@ class JobGraph(object):
def addJob(self, job):
# A graph must be created after the job list is frozen,
# therefore we should only get one job with the same name.
if job.name in self.jobs:
raise Exception("Job %s already added" % (job.name,))
self._job_map[job.name] = job
if (self.model_version < 22):
job_id = job.name
else:
job_id = job.uuid
self._job_map[job_id] = job
self.jobs.append(job.name)
self.job_uuids.append(job.uuid)
# Append the dependency information
self._dependencies.setdefault(job.name, {})
self._dependencies.setdefault(job_id, {})
for dependency in job.dependencies:
self._dependencies[job.name][dependency.name] = dependency.soft
self._dependencies[job_id][dependency.name] = dependency.soft
def getJobs(self):
# Report in the order of layout cfg
return list([self._job_map[x] for x in self.jobs])
if (self.model_version < 22):
return [self._job_map[x] for x in self.jobs]
else:
return [self._job_map[x] for x in self.job_uuids]
def getUuidForJob(self, job_name):
def getJobIds(self):
if (self.model_version < 22):
return self.jobs
else:
return self.job_uuids
def getUuidForJobName(self, job_name):
return self.job_uuids[self.jobs.index(job_name)]
def getUuidForJobId(self, job_id):
if (self.model_version < 22):
return self.job_uuids[self.jobs.index(job_id)]
return job_id
def getNameForJobId(self, job_id):
if (self.model_version < 22):
return job_id
return self.jobs[self.job_uuids.index(job_id)]
def getJobFromUuid(self, job_uuid):
# TODO: remove when self.job_map is indexed by uuid
index = self.job_uuids.index(job_uuid)
name = self.jobs[index]
return self._job_map[name]
if (self.model_version < 22):
index = self.job_uuids.index(job_uuid)
name = self.jobs[index]
return self._job_map[name]
else:
return self._job_map[job_uuid]
def getJobFromName(self, job_name):
# TODO: this must be removed by completion of circular
# dependency refactor.
if (self.model_version < 22):
return self._job_map.get(job_name)
else:
try:
index = self.jobs.index(job_name)
except ValueError:
return None
uuid = self.job_uuids[index]
return self._job_map[uuid]
def getJob(self, name, ref):
for job in self._job_map.values():
if job.name == name and job.ref == ref:
return job
def getDirectDependentJobs(self, job):
# First, are we able to support the new method?
@@ -3792,8 +3851,8 @@ class JobGraph(object):
def freezeDependencies(self, layout=None):
if (COMPONENT_REGISTRY.model_api < 21):
return self._legacyCheckDependencies(layout)
for dependent_name, parents in self._dependencies.items():
dependent_uuid = self.getUuidForJob(dependent_name)
for dependent_id, parents in self._dependencies.items():
dependent_uuid = self.getUuidForJobId(dependent_id)
if dependent_uuid is None:
# MODEL_API < 21
self.job_dependencies = {}
@@ -3801,7 +3860,11 @@ class JobGraph(object):
return self._legacyCheckDependencies(layout)
dependencies = self.job_dependencies.setdefault(dependent_uuid, {})
for parent_name, parent_soft in parents.items():
if parent_name not in self._job_map:
dependent_job = self._job_map[dependent_id]
# We typically depend on jobs with the same ref (but
# this could later be modified by deduplication).
parent_job = self.getJob(parent_name, dependent_job.ref)
if parent_job is None:
if parent_soft:
if layout:
# If the caller spplied a layout, verify that the
@@ -3811,19 +3874,13 @@ class JobGraph(object):
continue
raise Exception(
"Job %s depends on %s which was not run." %
(dependent_name, parent_name))
# TODO: when we support multiple jobs with the same
# name, this lookup will be indexed by (name, ref), so
# that we typically depend on jobs with the same ref
# (but this could be modified by deduplication).
parent_job = self._job_map[parent_name]
(dependent_job.name, parent_name))
dependencies[parent_job.uuid] = dict(soft=parent_soft)
dependents = self.job_dependents.setdefault(
parent_job.uuid, {})
dependents[dependent_uuid] = dict(soft=parent_soft)
for dependent_name, parents in self._dependencies.items():
dependent_uuid = self.getUuidForJob(dependent_name)
dependent_job = self.getJobFromUuid(dependent_uuid)
for dependent_id, parents in self._dependencies.items():
dependent_job = self._job_map[dependent_id]
# For the side effect of verifying no cycles
self.getParentJobsRecursively(dependent_job)
@@ -4694,27 +4751,28 @@ class BuildSet(zkobject.ZKObject):
build_versions = data.get('build_versions', {})
# jobs (deserialize as separate objects)
if job_graph := data['job_graph']:
for job_name in job_graph.jobs:
for job_id in job_graph.getJobIds():
# If we have a current build before refreshing, we may
# be able to skip refreshing some items since they
# will not have changed.
job_name = job_graph.getNameForJobId(job_id)
build_path = data["builds"].get(job_name)
old_build = self.builds.get(job_name)
old_build_exists = (old_build
and old_build.getPath() == build_path)
if job_name in self.jobs:
job = self.jobs[job_name]
if job_id in self.jobs:
job = self.jobs[job_id]
if ((not old_build_exists) or
self.shouldRefreshJob(job, job_versions)):
tpe_jobs.append((None, job_name,
tpe_jobs.append((None, job_id, job_name,
tpe.submit(job.refresh, context)))
else:
job_uuid = job_graph.getUuidForJob(job_name)
job_uuid = job_graph.getUuidForJobId(job_id)
# MODEL_API < 19; use job_name if job_uuid is None
job_path = FrozenJob.jobPath(
job_uuid or job_name, self.getPath())
tpe_jobs.append(('job', job_name, tpe.submit(
tpe_jobs.append(('job', job_id, job_name, tpe.submit(
FrozenJob.fromZK, context, job_path, buildset=self)))
if build_path:
@@ -4723,7 +4781,7 @@ class BuildSet(zkobject.ZKObject):
if build and build.getPath() == build_path:
if self.shouldRefreshBuild(build, build_versions):
tpe_jobs.append((
None, job_name, tpe.submit(
None, job_id, job_name, tpe.submit(
build.refresh, context)))
else:
if not self._isMyBuild(build_path):
@@ -4732,7 +4790,7 @@ class BuildSet(zkobject.ZKObject):
builds[job_name] = build
else:
tpe_jobs.append((
'build', job_name, tpe.submit(
'build', job_id, job_name, tpe.submit(
Build.fromZK, context, build_path,
build_set=self)))
@@ -4748,23 +4806,23 @@ class BuildSet(zkobject.ZKObject):
retry_builds[job_name].append(retry_build)
else:
tpe_jobs.append((
'retry', job_name, tpe.submit(
'retry', job_id, job_name, tpe.submit(
Build.fromZK, context, retry_path,
build_set=self)))
for (kind, job_name, future) in tpe_jobs:
for (kind, job_id, job_name, future) in tpe_jobs:
result = future.result()
if kind == 'job':
self.jobs[job_name] = result
self.jobs[job_id] = result
elif kind == 'build':
# We normally set the job on the constructor, but we
# may not have had it in time. At this point though,
# the job future is guaranteed to have completed, so
# we can look it up now.
result._set(job=self.jobs[job_name])
result._set(job=self.jobs[job_id])
builds[job_name] = result
elif kind == 'retry':
result._set(job=self.jobs[job_name])
result._set(job=self.jobs[job_id])
retry_builds[job_name].append(result)
data.update({
@@ -5218,6 +5276,9 @@ class QueueItem(zkobject.ZKObject):
event = event_class.fromDict(data["event"]["data"])
change = self.pipeline.manager.resolveChangeReferences(
[data["change"]])[0]
# MODEL_API < 22: This can be removed once we remove the
# backwards-compat setting of FrozenJob.ref
self._set(change=change)
build_set = self.current_build_set
if build_set and build_set.getPath() == data["current_build_set"]:
@@ -5352,7 +5413,7 @@ class QueueItem(zkobject.ZKObject):
return self.current_build_set.job_graph.getJobs()
def getJob(self, name):
return self.current_build_set.jobs.get(name)
return self.current_build_set.job_graph.getJobFromName(name)
@property
def items_ahead(self):
@@ -6480,7 +6541,8 @@ class QueueItem(zkobject.ZKObject):
# which jobs have changed, so rather than run them
# all, just rely on the file matchers as-is.
return False
old_job = self.current_build_set._old_jobs.get(job.name)
old_job = self.current_build_set._old_job_graph.getJobFromName(
job.name)
if old_job is None:
log.debug("Found a newly created job")
return True # A newly created job

View File

@@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
MODEL_API = 21
MODEL_API = 22

View File

@@ -1762,7 +1762,7 @@ class ZuulWebAPI(object):
# would return the job with any in-change modifications.
item = self._freeze_jobs(
tenant, pipeline_name, project_name, branch_name)
job = item.current_build_set.jobs.get(job_name)
job = item.current_build_set.job_graph.getJobFromName(job_name)
if not job:
raise cherrypy.HTTPError(404)
@@ -1808,6 +1808,7 @@ class ZuulWebAPI(object):
change = Branch(project)
change.branch = branch_name or "master"
change.cache_stat = FakeCacheKey()
with LocalZKContext(self.log) as context:
queue = ChangeQueue.new(context, pipeline=pipeline)
item = QueueItem.new(context, queue=queue, change=change)
@@ -1934,6 +1935,15 @@ class StreamManager(object):
self.emitStats()
class FakeCacheKey:
class Dummy():
pass
def __init__(self):
self.key = self.Dummy()
self.key.reference = uuid.uuid4().hex
class ZuulWeb(object):
log = logging.getLogger("zuul.web")
tracer = trace.get_tracer("zuul")