Avoid unnecessary change dependency updates

Updating commit and topic dependencies incurres a cost as we query the
source system for the change details.

The current implementation will update the commit and topic dependencies
independent of whether or not the dependencies are already populated and
when they were last updated. This can lead to multiple updates for the
same change in a short amount of time e.g. when an event leads to a
change to be added to multiple pipelines or when a circular dependency
is enqueued.

Instead we can use the following conditions to determine if the
dependencies need to be refreshed:

1. when `updateCommitDependencies()` is called without an event (force a
   dependency refresh)
2. when the change's commit or topic dependencies were never updated
3. when the event ltime is greater than the last modified zxid of the
   change (dependencies could have change in the meantime)

Change-Id: I4fd6c0d4cf2839010ddf7105a7db12da06ef1074
This commit is contained in:
Simon Westphahl 2022-12-12 15:20:12 +01:00 committed by James E. Blair
parent cb40ddc7db
commit d1f34d506a

View File

@ -827,37 +827,53 @@ class PipelineManager(metaclass=ABCMeta):
def updateCommitDependencies(self, change, change_queue, event):
log = get_annotated_logger(self.log, event)
# Search for Depends-On headers and find appropriate changes
log.debug(" Updating commit dependencies for %s", change)
dependencies = []
seen = set()
for match in find_dependency_headers(change.message):
log.debug(" Found Depends-On header: %s", match)
if match in seen:
continue
seen.add(match)
try:
url = urllib.parse.urlparse(match)
except ValueError:
continue
source = self.sched.connections.getSourceByHostname(
url.hostname)
if not source:
continue
log.debug(" Found source: %s", source)
dep = source.getChangeByURLWithRetry(match, event)
if dep and (not dep.is_merged) and dep not in dependencies:
log.debug(" Adding dependency: %s", dep)
dependencies.append(dep)
new_commit_needs_changes = [d.cache_key for d in dependencies]
must_update_commit_deps = (
not hasattr(event, "zuul_event_ltime")
or change.commit_needs_changes is None
or change.cache_stat.mzxid <= event.zuul_event_ltime
)
update_attrs = dict(commit_needs_changes=new_commit_needs_changes)
must_update_topic_deps = (
self.useDependenciesByTopic(change.project) and (
not hasattr(event, "zuul_event_ltime")
or change.topic_needs_changes is None
or change.cache_stat.mzxid <= event.zuul_event_ltime
)
)
update_attrs = {}
if must_update_commit_deps:
# Search for Depends-On headers and find appropriate changes
log.debug(" Updating commit dependencies for %s", change)
dependencies = []
seen = set()
for match in find_dependency_headers(change.message):
log.debug(" Found Depends-On header: %s", match)
if match in seen:
continue
seen.add(match)
try:
url = urllib.parse.urlparse(match)
except ValueError:
continue
source = self.sched.connections.getSourceByHostname(
url.hostname)
if not source:
continue
log.debug(" Found source: %s", source)
dep = source.getChangeByURLWithRetry(match, event)
if dep and (not dep.is_merged) and dep not in dependencies:
log.debug(" Adding dependency: %s", dep)
dependencies.append(dep)
new_commit_needs_changes = [d.cache_key for d in dependencies]
update_attrs = dict(commit_needs_changes=new_commit_needs_changes)
# Ask the source for any tenant-specific changes (this allows
# drivers to implement their own way of collecting deps):
source = self.sched.connections.getSource(
change.project.connection_name)
if self.useDependenciesByTopic(change.project):
if must_update_topic_deps:
log.debug(" Updating topic dependencies for %s", change)
new_topic_needs_changes = []
for dep in source.getChangesByTopic(change.topic):
@ -866,7 +882,8 @@ class PipelineManager(metaclass=ABCMeta):
new_topic_needs_changes.append(dep.cache_key)
update_attrs['topic_needs_changes'] = new_topic_needs_changes
source.setChangeAttributes(change, **update_attrs)
if update_attrs:
source.setChangeAttributes(change, **update_attrs)
def provisionNodes(self, item):
log = item.annotateLogger(self.log)