Add span for builds and propagate via request

Change-Id: Ib10452862e7aa1355502bb381d3ff07c65ac7187
Co-Authored-By: James E. Blair <jim@acmegating.com>
Co-Authored-By: Tristan Cacqueray <tdecacqu@redhat.com>
This commit is contained in:
Simon Westphahl 2022-09-08 15:24:40 +02:00
parent c1845b02a4
commit 075bdd0178
No known key found for this signature in database
5 changed files with 72 additions and 21 deletions

View File

@ -181,8 +181,13 @@ class TestTracing(ZuulTestCase):
self.log.debug("Received:\n%s", buildset) self.log.debug("Received:\n%s", buildset)
item = self.getSpan('QueueItem') item = self.getSpan('QueueItem')
self.log.debug("Received:\n%s", item) self.log.debug("Received:\n%s", item)
build = self.getSpan('Build')
self.log.debug("Received:\n%s", build)
job = self.getSpan('JobExecution')
self.log.debug("Received:\n%s", job)
self.assertEqual(item.trace_id, buildset.trace_id) self.assertEqual(item.trace_id, buildset.trace_id)
self.assertNotEqual(item.span_id, buildset.span_id) self.assertEqual(item.trace_id, build.trace_id)
self.assertNotEqual(item.span_id, job.span_id)
self.assertTrue(buildset.start_time_unix_nano >= self.assertTrue(buildset.start_time_unix_nano >=
item.start_time_unix_nano) item.start_time_unix_nano)
self.assertTrue(buildset.end_time_unix_nano <= self.assertTrue(buildset.end_time_unix_nano <=

View File

@ -28,7 +28,10 @@ from zuul.model import (
from zuul.zk.event_queues import PipelineResultEventQueue from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.executor import ExecutorApi from zuul.zk.executor import ExecutorApi
from zuul.zk.exceptions import JobRequestNotFound from zuul.zk.exceptions import JobRequestNotFound
import zuul.lib.tracing as tracing
from kazoo.exceptions import BadVersionError from kazoo.exceptions import BadVersionError
from opentelemetry import trace
class ExecutorClient(object): class ExecutorClient(object):
@ -50,6 +53,7 @@ class ExecutorClient(object):
def execute(self, job, nodes, item, pipeline, executor_zone, def execute(self, job, nodes, item, pipeline, executor_zone,
dependent_changes=[], merger_items=[]): dependent_changes=[], merger_items=[]):
log = get_annotated_logger(self.log, item.event) log = get_annotated_logger(self.log, item.event)
tracer = trace.get_tracer("zuul")
uuid = str(uuid4().hex) uuid = str(uuid4().hex)
log.info( log.info(
"Execute job %s (uuid: %s) on nodes %s for change %s " "Execute job %s (uuid: %s) on nodes %s for change %s "
@ -63,11 +67,18 @@ class ExecutorClient(object):
# TODO: deprecate and remove this variable? # TODO: deprecate and remove this variable?
params["zuul"]["_inheritance_path"] = list(job.inheritance_path) params["zuul"]["_inheritance_path"] = list(job.inheritance_path)
parent_span = tracing.restoreSpan(item.current_build_set.span_info)
execute_time = time.time()
with trace.use_span(parent_span):
build_span = tracer.start_span("Build", start_time=execute_time)
build_span_info = tracing.getSpanInfo(build_span)
build = Build.new( build = Build.new(
pipeline.manager.current_context, pipeline.manager.current_context,
job=job, job=job,
build_set=item.current_build_set, build_set=item.current_build_set,
uuid=uuid, uuid=uuid,
execute_time=execute_time,
span_info=build_span_info,
zuul_event_id=item.event.zuul_event_id, zuul_event_id=item.event.zuul_event_id,
) )
@ -123,16 +134,17 @@ class ExecutorClient(object):
# Fall back to the default zone # Fall back to the default zone
executor_zone = None executor_zone = None
request = BuildRequest( with trace.use_span(build_span):
uuid=uuid, request = BuildRequest(
build_set_uuid=build.build_set.uuid, uuid=uuid,
job_name=job.name, build_set_uuid=build.build_set.uuid,
tenant_name=build.build_set.item.pipeline.tenant.name, job_name=job.name,
pipeline_name=build.build_set.item.pipeline.name, tenant_name=build.build_set.item.pipeline.tenant.name,
zone=executor_zone, pipeline_name=build.build_set.item.pipeline.name,
event_id=item.event.zuul_event_id, zone=executor_zone,
precedence=PRIORITY_MAP[pipeline.precedence] event_id=item.event.zuul_event_id,
) precedence=PRIORITY_MAP[pipeline.precedence],
)
self.executor_api.submit(request, params) self.executor_api.submit(request, params)
build.updateAttributes(pipeline.manager.current_context, build.updateAttributes(pipeline.manager.current_context,
build_request_ref=request.path) build_request_ref=request.path)

View File

@ -45,6 +45,7 @@ from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger from zuul.lib.logutil import get_annotated_logger
from zuul.lib.monitoring import MonitoringServer from zuul.lib.monitoring import MonitoringServer
from zuul.lib.statsd import get_statsd from zuul.lib.statsd import get_statsd
from zuul.lib import tracing
from zuul.lib import filecomments from zuul.lib import filecomments
from zuul.lib.keystorage import KeyStorage from zuul.lib.keystorage import KeyStorage
from zuul.lib.varnames import check_varnames from zuul.lib.varnames import check_varnames
@ -1099,6 +1100,13 @@ class AnsibleJob(object):
self.thread.join() self.thread.join()
def execute(self): def execute(self):
with tracing.startSpanInContext(
self.build_request.span_context,
'JobExecution',
attributes={'hostname': self.executor_server.hostname}):
self.do_execute()
def do_execute(self):
try: try:
self.time_starting_build = time.monotonic() self.time_starting_build = time.monotonic()
@ -3132,6 +3140,7 @@ class ExecutorServer(BaseMergeServer):
self.monitoring_server = MonitoringServer(self.config, 'executor', self.monitoring_server = MonitoringServer(self.config, 'executor',
self.component_info) self.component_info)
self.monitoring_server.start() self.monitoring_server.start()
self.tracing = tracing.Tracing(self.config)
self.log_streaming_port = log_streaming_port self.log_streaming_port = log_streaming_port
self.governor_lock = threading.Lock() self.governor_lock = threading.Lock()
self.run_lock = threading.Lock() self.run_lock = threading.Lock()
@ -3425,6 +3434,7 @@ class ExecutorServer(BaseMergeServer):
super().stop() super().stop()
self.stopRepl() self.stopRepl()
self.monitoring_server.stop() self.monitoring_server.stop()
self.tracing.stop()
self.log.debug("Stopped") self.log.debug("Stopped")
def join(self): def join(self):

View File

@ -33,6 +33,7 @@ import itertools
from kazoo.exceptions import NodeExistsError, NoNodeError from kazoo.exceptions import NodeExistsError, NoNodeError
from cachetools.func import lru_cache from cachetools.func import lru_cache
from opentelemetry import trace
from zuul.lib import yamlutil as yaml from zuul.lib import yamlutil as yaml
from zuul.lib.varnames import check_varnames from zuul.lib.varnames import check_varnames
@ -45,6 +46,7 @@ from zuul.lib.result_data import get_artifacts_from_result_data
from zuul.lib.logutil import get_annotated_logger from zuul.lib.logutil import get_annotated_logger
from zuul.lib.capabilities import capabilities_registry from zuul.lib.capabilities import capabilities_registry
from zuul.lib.jsonutil import json_dumps from zuul.lib.jsonutil import json_dumps
from zuul.lib import tracing
from zuul.zk import zkobject from zuul.zk import zkobject
from zuul.zk.blob_store import BlobStore from zuul.zk.blob_store import BlobStore
from zuul.zk.change_cache import ChangeKey from zuul.zk.change_cache import ChangeKey
@ -3383,7 +3385,8 @@ class JobRequest:
# This object participates in transactions, and therefore must # This object participates in transactions, and therefore must
# remain small and unsharded. # remain small and unsharded.
def __init__(self, uuid, precedence=None, state=None, result_path=None): def __init__(self, uuid, precedence=None, state=None, result_path=None,
span_context=None):
self.uuid = uuid self.uuid = uuid
if precedence is None: if precedence is None:
self.precedence = 0 self.precedence = 0
@ -3396,6 +3399,12 @@ class JobRequest:
self.state = state self.state = state
# Path to the future result if requested # Path to the future result if requested
self.result_path = result_path self.result_path = result_path
# Reference to the parent span
if span_context:
self.span_context = span_context
else:
span = trace.get_current_span()
self.span_context = tracing.getSpanContext(span)
# ZK related data not serialized # ZK related data not serialized
self.path = None self.path = None
@ -3408,12 +3417,14 @@ class JobRequest:
"state": self.state, "state": self.state,
"precedence": self.precedence, "precedence": self.precedence,
"result_path": self.result_path, "result_path": self.result_path,
"span_context": self.span_context,
} }
def updateFromDict(self, data): def updateFromDict(self, data):
self.precedence = data["precedence"] self.precedence = data["precedence"]
self.state = data["state"] self.state = data["state"]
self.result_path = data["result_path"] self.result_path = data["result_path"]
self.span_context = data.get("span_context")
@classmethod @classmethod
def fromDict(cls, data): def fromDict(cls, data):
@ -3421,7 +3432,8 @@ class JobRequest:
data["uuid"], data["uuid"],
precedence=data["precedence"], precedence=data["precedence"],
state=data["state"], state=data["state"],
result_path=data["result_path"] result_path=data["result_path"],
span_context=data.get("span_context"),
) )
def __lt__(self, other): def __lt__(self, other):
@ -3461,8 +3473,8 @@ class MergeRequest(JobRequest):
def __init__(self, uuid, job_type, build_set_uuid, tenant_name, def __init__(self, uuid, job_type, build_set_uuid, tenant_name,
pipeline_name, event_id, precedence=None, state=None, pipeline_name, event_id, precedence=None, state=None,
result_path=None): result_path=None, span_context=None):
super().__init__(uuid, precedence, state, result_path) super().__init__(uuid, precedence, state, result_path, span_context)
self.job_type = job_type self.job_type = job_type
self.build_set_uuid = build_set_uuid self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name self.tenant_name = tenant_name
@ -3491,7 +3503,8 @@ class MergeRequest(JobRequest):
data["event_id"], data["event_id"],
precedence=data["precedence"], precedence=data["precedence"],
state=data["state"], state=data["state"],
result_path=data["result_path"] result_path=data["result_path"],
span_context=data.get("span_context"),
) )
def __repr__(self): def __repr__(self):
@ -3511,8 +3524,8 @@ class BuildRequest(JobRequest):
def __init__(self, uuid, zone, build_set_uuid, job_name, tenant_name, def __init__(self, uuid, zone, build_set_uuid, job_name, tenant_name,
pipeline_name, event_id, precedence=None, state=None, pipeline_name, event_id, precedence=None, state=None,
result_path=None): result_path=None, span_context=None):
super().__init__(uuid, precedence, state, result_path) super().__init__(uuid, precedence, state, result_path, span_context)
self.zone = zone self.zone = zone
self.build_set_uuid = build_set_uuid self.build_set_uuid = build_set_uuid
self.job_name = job_name self.job_name = job_name
@ -3549,7 +3562,8 @@ class BuildRequest(JobRequest):
data["event_id"], data["event_id"],
precedence=data["precedence"], precedence=data["precedence"],
state=data["state"], state=data["state"],
result_path=data["result_path"] result_path=data["result_path"],
span_context=data.get("span_context"),
) )
request.worker_info = data["worker_info"] request.worker_info = data["worker_info"]
@ -3607,6 +3621,7 @@ class Build(zkobject.ZKObject):
held=False, held=False,
zuul_event_id=None, zuul_event_id=None,
build_request_ref=None, build_request_ref=None,
span_info=None,
) )
def serialize(self, context): def serialize(self, context):
@ -3625,6 +3640,7 @@ class Build(zkobject.ZKObject):
"held": self.held, "held": self.held,
"zuul_event_id": self.zuul_event_id, "zuul_event_id": self.zuul_event_id,
"build_request_ref": self.build_request_ref, "build_request_ref": self.build_request_ref,
"span_info": self.span_info,
} }
if COMPONENT_REGISTRY.model_api < 5: if COMPONENT_REGISTRY.model_api < 5:
data["_result_data"] = (self._result_data.getPath() data["_result_data"] = (self._result_data.getPath()

View File

@ -42,7 +42,7 @@ from zuul.lib.monitoring import MonitoringServer
from zuul.lib.queue import NamedQueue from zuul.lib.queue import NamedQueue
from zuul.lib.times import Times from zuul.lib.times import Times
from zuul.lib.statsd import get_statsd, normalize_statsd_name from zuul.lib.statsd import get_statsd, normalize_statsd_name
from zuul.lib.tracing import Tracing from zuul.lib import tracing
import zuul.lib.queue import zuul.lib.queue
import zuul.lib.repl import zuul.lib.repl
from zuul import nodepool from zuul import nodepool
@ -191,7 +191,7 @@ class Scheduler(threading.Thread):
self.daemon = True self.daemon = True
self.wait_for_init = wait_for_init self.wait_for_init = wait_for_init
self.hostname = socket.getfqdn() self.hostname = socket.getfqdn()
self.tracing = Tracing(config) self.tracing = tracing.Tracing(config)
self.primed_event = threading.Event() self.primed_event = threading.Event()
# Wake up the main run loop # Wake up the main run loop
self.wake_event = threading.Event() self.wake_event = threading.Event()
@ -2709,6 +2709,14 @@ class Scheduler(threading.Thread):
build.result = result build.result = result
attributes = {
"uuid": build.uuid,
"job": build.job.name,
"buildset_uuid": build.build_set.item.current_build_set.uuid,
"zuul_event_id": build.build_set.item.event.zuul_event_id,
}
tracing.endSavedSpan(build.span_info, attributes=attributes)
self._reportBuildStats(build) self._reportBuildStats(build)
self._cleanupCompletedBuild(build) self._cleanupCompletedBuild(build)