Trace merge requests and merger operations

The span info for the different merger operations is stored on the
request and will be returned to the scheduler via the result event.

This also adds the request UUID to the "refstat" job so that we can
attach that as a span attribute.

Change-Id: Ib6ac7b5e7032d168f53fe32e28358bd0b87df435
This commit is contained in:
Simon Westphahl 2022-09-13 14:30:14 +02:00
parent 075bdd0178
commit f1e3d67608
No known key found for this signature in database
5 changed files with 104 additions and 26 deletions

View File

@ -31,6 +31,16 @@ class TestTracing(ZuulTestCase):
config_file = 'zuul-tracing.conf'
tenant_config_file = "config/single-tenant/main.yaml"
def _waitForSpans(self, *span_names, timeout=60,):
for _ in iterate_timeout(timeout, "requests to arrive"):
test_requests = [
r for r in self.otlp.requests
if r.resource_spans[0].scope_spans[0].spans[0].name
in span_names
]
if len(test_requests) == len(span_names):
return test_requests
def test_tracing_api(self):
tracer = trace.get_tracer("zuul")
@ -68,10 +78,10 @@ class TestTracing(ZuulTestCase):
tracing.endSavedSpan(span_info, end_time=time.time(),
attributes={'endattr': 'baz'})
for _ in iterate_timeout(60, "request to arrive"):
if len(self.otlp.requests) == 4:
break
req1 = self.otlp.requests[0]
test_requests = self._waitForSpans(
"parent-trace", "child1-trace", "child2-trace", "child3-trace")
req1 = test_requests[0]
self.log.debug("Received:\n%s", req1)
attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
@ -80,7 +90,7 @@ class TestTracing(ZuulTestCase):
span1 = req1.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child1-trace", span1.name)
req2 = self.otlp.requests[1]
req2 = test_requests[1]
self.log.debug("Received:\n%s", req2)
span2 = req2.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child2-trace", span2.name)
@ -88,12 +98,12 @@ class TestTracing(ZuulTestCase):
attrs = attributes_to_dict(span2.links[0].attributes)
self.assertEqual({"relationship": "prev"}, attrs)
req3 = self.otlp.requests[2]
req3 = test_requests[2]
self.log.debug("Received:\n%s", req3)
span3 = req3.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child3-trace", span3.name)
req4 = self.otlp.requests[3]
req4 = test_requests[3]
self.log.debug("Received:\n%s", req4)
span4 = req4.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("parent-trace", span4.name)
@ -138,10 +148,10 @@ class TestTracing(ZuulTestCase):
# End our root span manually.
span.end(end_time=time.time())
for _ in iterate_timeout(60, "request to arrive"):
if len(self.otlp.requests) == 3:
break
req1 = self.otlp.requests[0]
test_requests = self._waitForSpans(
"child1-trace", "child2-trace", "child3-trace")
req1 = test_requests[0]
self.log.debug("Received:\n%s", req1)
attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
@ -150,7 +160,7 @@ class TestTracing(ZuulTestCase):
span1 = req1.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child1-trace", span1.name)
req2 = self.otlp.requests[1]
req2 = test_requests[1]
self.log.debug("Received:\n%s", req2)
span2 = req2.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child2-trace", span2.name)
@ -158,7 +168,7 @@ class TestTracing(ZuulTestCase):
attrs = attributes_to_dict(span2.links[0].attributes)
self.assertEqual({"relationship": "prev"}, attrs)
req3 = self.otlp.requests[2]
req3 = test_requests[2]
self.log.debug("Received:\n%s", req3)
span3 = req3.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child3-trace", span3.name)
@ -181,6 +191,8 @@ class TestTracing(ZuulTestCase):
self.log.debug("Received:\n%s", buildset)
item = self.getSpan('QueueItem')
self.log.debug("Received:\n%s", item)
merge_job = self.getSpan('Merge')
self.log.debug("Received:\n%s", merge_job)
build = self.getSpan('Build')
self.log.debug("Received:\n%s", build)
job = self.getSpan('JobExecution')
@ -192,6 +204,10 @@ class TestTracing(ZuulTestCase):
item.start_time_unix_nano)
self.assertTrue(buildset.end_time_unix_nano <=
item.end_time_unix_nano)
self.assertTrue(merge_job.start_time_unix_nano >=
buildset.start_time_unix_nano)
self.assertTrue(merge_job.end_time_unix_nano <=
buildset.end_time_unix_nano)
item_attrs = attributes_to_dict(item.attributes)
self.assertTrue(item_attrs['ref_number'] == "1")
self.assertTrue(item_attrs['ref_patchset'] == "1")

View File

@ -17,6 +17,7 @@ from uuid import uuid4
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
import zuul.lib.tracing as tracing
from zuul.model import (
FilesChangesCompletedEvent,
MergeCompletedEvent,
@ -27,13 +28,23 @@ from zuul.model import (
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.merger import MergerApi
from zuul.zk.exceptions import JobRequestNotFound
from kazoo.exceptions import BadVersionError, NoNodeError
from opentelemetry import trace
_JOB_TYPE_TO_SPAN_NAME = {
MergeRequest.MERGE: "Merge",
MergeRequest.CAT: "Cat",
MergeRequest.REF_STATE: "RefState",
MergeRequest.FILES_CHANGES: "FilesChanges",
}
class MergeClient(object):
log = logging.getLogger("zuul.MergeClient")
_merger_api_class = MergerApi
tracer = trace.get_tracer("zuul")
def __init__(self, config, sched):
self.config = config
@ -63,26 +74,33 @@ class MergeClient(object):
build_set_uuid = None
tenant_name = None
pipeline_name = None
parent_span = None
if build_set is not None:
build_set_uuid = build_set.uuid
tenant_name = build_set.item.pipeline.tenant.name
pipeline_name = build_set.item.pipeline.name
parent_span = tracing.restoreSpan(build_set.span_info)
with trace.use_span(parent_span):
job_span = self.tracer.start_span(_JOB_TYPE_TO_SPAN_NAME[job_type])
uuid = str(uuid4().hex)
log = get_annotated_logger(self.log, event)
log.debug("Submitting job %s with data %s", uuid, data)
request = MergeRequest(
uuid=uuid,
job_type=job_type,
build_set_uuid=build_set_uuid,
tenant_name=tenant_name,
pipeline_name=pipeline_name,
event_id=event.zuul_event_id if event else None,
precedence=precedence
)
with trace.use_span(job_span):
request = MergeRequest(
uuid=uuid,
job_type=job_type,
build_set_uuid=build_set_uuid,
tenant_name=tenant_name,
pipeline_name=pipeline_name,
event_id=event.zuul_event_id if event else None,
precedence=precedence,
span_info=tracing.getSpanInfo(job_span),
)
return self.merger_api.submit(request, data,
needs_result=needs_result)
@ -159,9 +177,11 @@ class MergeClient(object):
"via result event for %s", merge_request)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
merge_request.uuid,
merge_request.build_set_uuid,
files=None,
elapsed_time=None,
span_info=merge_request.span_info,
)
else:
event = MergeCompletedEvent(
@ -175,6 +195,7 @@ class MergeClient(object):
item_in_branches=None,
errors=None,
elapsed_time=None,
span_info=merge_request.span_info,
)
try:
self.result_events[merge_request.tenant_name][

View File

@ -28,6 +28,7 @@ from kazoo.exceptions import NoNodeError
from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
from zuul.lib import tracing
from zuul.merger import merger
from zuul.merger.merger import nullcontext
from zuul.model import (
@ -94,6 +95,7 @@ class BaseMergeServer(metaclass=ABCMeta):
self.config = config
self.tracing = tracing.Tracing(self.config)
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
@ -180,6 +182,7 @@ class BaseMergeServer(metaclass=ABCMeta):
self._merger_running = False
self.merger_loop_wake_event.set()
self.zk_client.disconnect()
self.tracing.stop()
def join(self):
self.merger_loop_wake_event.set()
@ -204,7 +207,11 @@ class BaseMergeServer(metaclass=ABCMeta):
for merge_request in self.merger_api.next():
if not self._merger_running:
break
self._runMergeJob(merge_request)
with tracing.startSpanInContext(
merge_request.span_context, "MergerJob",
attributes={"merger": self.hostname}):
self._runMergeJob(merge_request)
except Exception:
self.log.exception("Error in merge thread:")
time.sleep(5)
@ -411,9 +418,11 @@ class BaseMergeServer(metaclass=ABCMeta):
)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
merge_request.uuid,
merge_request.build_set_uuid,
files,
elapsed_time,
merge_request.span_info,
)
else:
event = MergeCompletedEvent(
@ -427,6 +436,7 @@ class BaseMergeServer(metaclass=ABCMeta):
item_in_branches,
errors,
elapsed_time,
merge_request.span_info,
)
def put_complete_event(log, merge_request, event):

View File

@ -3473,13 +3473,14 @@ class MergeRequest(JobRequest):
def __init__(self, uuid, job_type, build_set_uuid, tenant_name,
pipeline_name, event_id, precedence=None, state=None,
result_path=None, span_context=None):
result_path=None, span_context=None, span_info=None):
super().__init__(uuid, precedence, state, result_path, span_context)
self.job_type = job_type
self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.event_id = event_id
self.span_info = span_info
def toDict(self):
d = super().toDict()
@ -3489,6 +3490,7 @@ class MergeRequest(JobRequest):
"tenant_name": self.tenant_name,
"pipeline_name": self.pipeline_name,
"event_id": self.event_id,
"span_info": self.span_info,
})
return d
@ -3505,6 +3507,7 @@ class MergeRequest(JobRequest):
state=data["state"],
result_path=data["result_path"],
span_context=data.get("span_context"),
span_info=data.get("span_info"),
)
def __repr__(self):
@ -6369,7 +6372,7 @@ class MergeCompletedEvent(ResultEvent):
def __init__(self, request_uuid, build_set_uuid, merged, updated,
commit, files, repo_state, item_in_branches,
errors, elapsed_time):
errors, elapsed_time, span_info=None):
self.request_uuid = request_uuid
self.build_set_uuid = build_set_uuid
self.merged = merged
@ -6380,6 +6383,7 @@ class MergeCompletedEvent(ResultEvent):
self.item_in_branches = item_in_branches or []
self.errors = errors or []
self.elapsed_time = elapsed_time
self.span_info = span_info
def __repr__(self):
return ('<MergeCompletedEvent job: %s buildset: %s merged: %s '
@ -6400,6 +6404,7 @@ class MergeCompletedEvent(ResultEvent):
"item_in_branches": list(self.item_in_branches),
"errors": list(self.errors),
"elapsed_time": self.elapsed_time,
"span_info": self.span_info,
}
@classmethod
@ -6415,6 +6420,7 @@ class MergeCompletedEvent(ResultEvent):
list(data.get("item_in_branches", [])),
list(data.get("errors", [])),
data.get("elapsed_time"),
data.get("span_info"),
)
@ -6426,24 +6432,31 @@ class FilesChangesCompletedEvent(ResultEvent):
:arg float elapsed_time: Elapsed time of merge op in seconds.
"""
def __init__(self, build_set_uuid, files, elapsed_time):
def __init__(self, request_uuid, build_set_uuid, files, elapsed_time,
span_info=None):
self.request_uuid = request_uuid
self.build_set_uuid = build_set_uuid
self.files = files or []
self.elapsed_time = elapsed_time
self.span_info = span_info
def toDict(self):
return {
"request_uuid": self.request_uuid,
"build_set_uuid": self.build_set_uuid,
"files": list(self.files),
"elapsed_time": self.elapsed_time,
"span_info": self.span_info,
}
@classmethod
def fromDict(cls, data):
return cls(
data.get("request_uuid"),
data.get("build_set_uuid"),
list(data.get("files", [])),
data.get("elapsed_time"),
data.get("span_info"),
)

View File

@ -2754,12 +2754,30 @@ class Scheduler(threading.Thread):
build_set = self._getBuildSetFromPipeline(event, pipeline)
if not build_set:
return
tracing.endSavedSpan(
event.span_info,
attributes={
"uuid": event.request_uuid,
"buildset_uuid": build_set.uuid,
"zuul_event_id": build_set.item.event.zuul_event_id,
}
)
pipeline.manager.onMergeCompleted(event, build_set)
def _doFilesChangesCompletedEvent(self, event, pipeline):
build_set = self._getBuildSetFromPipeline(event, pipeline)
if not build_set:
return
tracing.endSavedSpan(
event.span_info,
attributes={
"uuid": event.request_uuid,
"buildset_uuid": build_set.uuid,
"zuul_event_id": build_set.item.event.zuul_event_id,
}
)
pipeline.manager.onFilesChangesCompleted(event, build_set)
def _doNodesProvisionedEvent(self, event, pipeline):