zuul/zuul/merger/client.py
James E. Blair 1f026bd49c Finish circular dependency refactor
This change completes the circular dependency refactor.

The principal change is that queue items may now include
more than one change simultaneously in the case of circular
dependencies.

In dependent pipelines, the two-phase reporting process is
simplified because it happens during processing of a single
item.

In independent pipelines, non-live items are still used for
linear depnedencies, but multi-change items are used for
circular dependencies.

Previously changes were enqueued recursively and then
bundles were made out of the resulting items.  Since we now
need to enqueue entire cycles in one queue item, the
dependency graph generation is performed at the start of
enqueing the first change in a cycle.

Some tests exercise situations where Zuul is processing
events for old patchsets of changes.  The new change query
sequence mentioned in the previous paragraph necessitates
more accurate information about out-of-date patchsets than
the previous sequence, therefore the Gerrit driver has been
updated to query and return more data about non-current
patchsets.

This change is not backwards compatible with the existing
ZK schema, and will require Zuul systems delete all pipeline
states during the upgrade.  A later change will implement
a helper command for this.

All backwards compatability handling for the last several
model_api versions which were added to prepare for this
upgrade have been removed.  In general, all model data
structures involving frozen jobs are now indexed by the
frozen job's uuid and no longer include the job name since
a job name no longer uniquely identifies a job in a buildset
(either the uuid or the (job name, change) tuple must be
used to identify it).

Job deduplication is simplified and now only needs to
consider jobs within the same buildset.

The fake github driver had a bug (fakegithub.py line 694) where
it did not correctly increment the check run counter, so our
tests that verified that we closed out obsolete check runs
when re-enqueing were not valid.  This has been corrected, and
in doing so, has necessitated some changes around quiet dequeing
when we re-enqueue a change.

The reporting in several drivers has been updated to support
reporting information about multiple changes in a queue item.

Change-Id: I0b9e4d3f9936b1e66a08142fc36866269dc287f1
Depends-On: https://review.opendev.org/907627
2024-02-09 07:39:40 -08:00

274 lines
10 KiB
Python

# Copyright 2014 OpenStack Foundation
# Copyright 2021-2022, 2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from uuid import uuid4
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
import zuul.lib.tracing as tracing
from zuul.model import (
FilesChangesCompletedEvent,
MergeCompletedEvent,
MergeRequest,
PRECEDENCE_HIGH,
PRECEDENCE_NORMAL,
)
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.merger import MergerApi
from zuul.zk.exceptions import JobRequestNotFound
from kazoo.exceptions import BadVersionError, NoNodeError
from opentelemetry import trace
_JOB_TYPE_TO_SPAN_NAME = {
MergeRequest.MERGE: "Merge",
MergeRequest.CAT: "Cat",
MergeRequest.REF_STATE: "RefState",
MergeRequest.FILES_CHANGES: "FilesChanges",
}
class MergeClient(object):
log = logging.getLogger("zuul.MergeClient")
_merger_api_class = MergerApi
tracer = trace.get_tracer("zuul")
def __init__(self, config, sched):
self.config = config
self.sched = sched
self.git_timeout = get_default(
self.config, 'merger', 'git_timeout', 300)
self.merger_api = self._merger_api_class(self.sched.zk_client)
self.result_events = PipelineResultEventQueue.createRegistry(
self.sched.zk_client
)
def submitJob(
self,
job_type,
data,
build_set,
precedence=PRECEDENCE_NORMAL,
needs_result=False,
event=None,
):
# We need the tenant, pipeline and queue names to put the merge result
# in the correct queue. The only source for those values in this
# context is the buildset. If no buildset is provided, we can't provide
# a result event. In those cases a user of this function can fall back
# to the return value which provides the result as a future stored in a
# ZooKeeper path.
build_set_uuid = None
tenant_name = None
pipeline_name = None
parent_span = None
if build_set is not None:
build_set_uuid = build_set.uuid
tenant_name = build_set.item.pipeline.tenant.name
pipeline_name = build_set.item.pipeline.name
parent_span = tracing.restoreSpan(build_set.span_info)
with trace.use_span(parent_span):
job_span = self.tracer.start_span(_JOB_TYPE_TO_SPAN_NAME[job_type])
uuid = str(uuid4().hex)
log = get_annotated_logger(self.log, event)
log.debug("Submitting job %s with data %s", uuid, data)
with trace.use_span(job_span):
request = MergeRequest(
uuid=uuid,
job_type=job_type,
build_set_uuid=build_set_uuid,
tenant_name=tenant_name,
pipeline_name=pipeline_name,
event_id=event.zuul_event_id if event else None,
precedence=precedence,
span_info=tracing.getSpanInfo(job_span),
)
return self.merger_api.submit(request, data,
needs_result=needs_result)
def mergeChanges(self, items, build_set, files=None, dirs=None,
repo_state=None, precedence=PRECEDENCE_NORMAL,
branches=None, event=None):
data = dict(items=items,
files=files,
dirs=dirs,
repo_state=repo_state,
branches=branches)
self.submitJob(
MergeRequest.MERGE, data, build_set, precedence, event=event)
def getRepoState(self, items, build_set, precedence=PRECEDENCE_NORMAL,
branches=None, event=None):
data = dict(items=items, branches=branches)
self.submitJob(
MergeRequest.REF_STATE, data, build_set, precedence, event=event)
def getFiles(self, connection_name, project_name, branch, files, dirs=[],
precedence=PRECEDENCE_HIGH, event=None):
data = dict(connection=connection_name,
project=project_name,
branch=branch,
files=files,
dirs=dirs)
job = self.submitJob(
MergeRequest.CAT,
data,
None,
precedence,
needs_result=True,
event=event,
)
return job
def getFilesChanges(self, changes, precedence=PRECEDENCE_HIGH,
build_set=None, needs_result=False,
event=None):
changes_data = []
for change in changes:
# if base_sha is not available, fallback to branch
tosha = getattr(change, "base_sha", None)
if tosha is None:
tosha = getattr(change, "branch", None)
changes_data.append(dict(
connection=change.project.connection_name,
project=change.project.name,
branch=change.ref,
tosha=tosha,
))
data = dict(changes=changes_data)
job = self.submitJob(
MergeRequest.FILES_CHANGES,
data,
build_set,
precedence,
needs_result=needs_result,
event=event,
)
return job
def getFilesChangesRaw(self, connection_name, project_name, branch, tosha,
precedence=PRECEDENCE_HIGH,
build_set=None, needs_result=False,
event=None):
changes_data = [dict(
connection=connection_name,
project=project_name,
branch=branch,
tosha=tosha,
)]
data = dict(changes=changes_data)
job = self.submitJob(
MergeRequest.FILES_CHANGES,
data,
build_set,
precedence,
needs_result=needs_result,
event=event,
)
return job
def cleanupLostMergeRequests(self):
for merge_request in self.merger_api.lostRequests():
try:
self.cleanupLostMergeRequest(merge_request)
except Exception:
self.log.exception("Exception cleaning up lost merge request:")
def cleanupLostMergeRequest(self, merge_request):
# Provide a result either via a result future or a result event
if merge_request.result_path:
self.log.debug(
"Merge request cleanup providing synchronous result "
"via future for %s", merge_request)
result = {}
self.merger_api.reportResult(merge_request, result)
elif merge_request.build_set_uuid:
self.log.debug(
"Merge request cleanup providing asynchronous result "
"via result event for %s", merge_request)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
merge_request.uuid,
merge_request.build_set_uuid,
files=None,
elapsed_time=None,
span_info=merge_request.span_info,
)
else:
event = MergeCompletedEvent(
merge_request.uuid,
merge_request.build_set_uuid,
merged=False,
updated=False,
commit=None,
files=None,
repo_state=None,
item_in_branches=None,
errors=None,
elapsed_time=None,
span_info=merge_request.span_info,
zuul_event_id=merge_request.event_id,
)
try:
self.result_events[merge_request.tenant_name][
merge_request.pipeline_name].put(event)
except NoNodeError:
self.log.warning("Pipeline was removed: %s",
merge_request.pipeline_name)
merge_request.state = MergeRequest.COMPLETED
try:
self.merger_api.update(merge_request)
# No need to unlock the build, as it is by definition unlocked.
# TODO (felix): If we want to optimize ZK requests, we could only
# call the remove() here.
self.merger_api.remove(merge_request)
except JobRequestNotFound as e:
self.log.warning("Could not complete merge: %s", str(e))
# In case we re-created the lock directory, still remove
# the request for the side effect of removing the lock.
self.merger_api.remove(merge_request)
return
except BadVersionError:
# There could be a race condition:
# The merge request is found by lost_merge_requests in
# state RUNNING but gets completed/unlocked before the
# is_locked() check. Since we use the znode version, the
# update will fail in this case and we can simply ignore
# the exception.
return
def cancel(self, job):
try:
# Try to remove the request first
request = self.merger_api.get(job.request_path)
if request:
if self.merger_api.lock(request, blocking=False):
try:
self.merger_api.remove(request)
finally:
self.merger_api.unlock(request)
finally:
# Regardless of that, remove the waiter node
job.cancel()