Merge "Tracing: implement span save/restore"

changes/52/859152/1
Zuul 7 days ago committed by Gerrit Code Review
commit 2bc750ac70
  1. 5
      tests/otlp_fixture.py
  2. 176
      tests/unit/test_tracing.py
  3. 203
      zuul/lib/tracing.py
  4. 25
      zuul/manager/__init__.py
  5. 17
      zuul/model.py

@ -11,11 +11,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from concurrent import futures
import fixtures
import grpc
from opentelemetry import trace
from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import (
TraceServiceServicer,
add_TraceServiceServicer_to_server
@ -45,6 +45,9 @@ class OTLPFixture(fixtures.Fixture):
self.server = grpc.server(self.executor)
add_TraceServiceServicer_to_server(TraceServer(self), self.server)
self.port = self.server.add_insecure_port('[::]:0')
# Reset global tracer provider
trace._TRACER_PROVIDER_SET_ONCE = trace.Once()
trace._TRACER_PROVIDER = None
def _setUp(self):
self.server.start()

@ -12,8 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from tests.base import iterate_timeout, ZuulTestCase
import zuul.lib.tracing as tracing
from opentelemetry import trace
def attributes_to_dict(attrlist):
ret = {}
@ -26,16 +31,169 @@ class TestTracing(ZuulTestCase):
config_file = 'zuul-tracing.conf'
tenant_config_file = "config/single-tenant/main.yaml"
def test_tracing(self):
self.scheds.first.sched.tracing.test()
def test_tracing_api(self):
tracer = trace.get_tracer("zuul")
# We have a lot of timestamps stored as floats, so make sure
# our root span is a ZuulSpan that can handle that input.
span_info = tracing.startSavedSpan('parent-trace',
start_time=time.time(),
attributes={'startattr': 'bar'},
include_attributes=True)
# Simulate a reconstructed root span
span = tracing.restoreSpan(span_info)
# Within the root span, use the more typical OpenTelemetry
# context manager api.
with trace.use_span(span):
with tracer.start_span('child1-trace') as child1_span:
link = trace.Link(child1_span.context,
attributes={'relationship': 'prev'})
# Make sure that we can manually start and stop a child span,
# and that it is a ZuulSpan as well.
with trace.use_span(span):
child = tracer.start_span('child2-trace', start_time=time.time(),
links=[link])
child.end(end_time=time.time())
# Make sure that we can start a child span from a span
# context and not a full span:
span_context = tracing.getSpanContext(span)
with tracing.startSpanInContext(span_context, 'child3-trace') as child:
child.end(end_time=time.time())
# End our root span manually.
tracing.endSavedSpan(span_info, end_time=time.time(),
attributes={'endattr': 'baz'})
for _ in iterate_timeout(60, "request to arrive"):
if len(self.otlp.requests) == 4:
break
req1 = self.otlp.requests[0]
self.log.debug("Received:\n%s", req1)
attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
self.assertEqual("zuul",
req1.resource_spans[0].scope_spans[0].scope.name)
span1 = req1.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child1-trace", span1.name)
req2 = self.otlp.requests[1]
self.log.debug("Received:\n%s", req2)
span2 = req2.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child2-trace", span2.name)
self.assertEqual(span2.links[0].span_id, span1.span_id)
attrs = attributes_to_dict(span2.links[0].attributes)
self.assertEqual({"relationship": "prev"}, attrs)
req3 = self.otlp.requests[2]
self.log.debug("Received:\n%s", req3)
span3 = req3.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child3-trace", span3.name)
req4 = self.otlp.requests[3]
self.log.debug("Received:\n%s", req4)
span4 = req4.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("parent-trace", span4.name)
attrs = attributes_to_dict(span4.attributes)
self.assertEqual({"startattr": "bar",
"endattr": "baz"}, attrs)
self.assertEqual(span1.trace_id, span4.trace_id)
self.assertEqual(span2.trace_id, span4.trace_id)
self.assertEqual(span3.trace_id, span4.trace_id)
def test_tracing_api_null(self):
tracer = trace.get_tracer("zuul")
# Test that restoring spans and span contexts works with
# null values.
span_info = None
# Simulate a reconstructed root span from a null value
span = tracing.restoreSpan(span_info)
# Within the root span, use the more typical OpenTelemetry
# context manager api.
with trace.use_span(span):
with tracer.start_span('child1-trace') as child1_span:
link = trace.Link(child1_span.context,
attributes={'relationship': 'prev'})
# Make sure that we can manually start and stop a child span,
# and that it is a ZuulSpan as well.
with trace.use_span(span):
child = tracer.start_span('child2-trace', start_time=time.time(),
links=[link])
child.end(end_time=time.time())
# Make sure that we can start a child span from a null span
# context:
span_context = None
with tracing.startSpanInContext(span_context, 'child3-trace') as child:
child.end(end_time=time.time())
# End our root span manually.
span.end(end_time=time.time())
for _ in iterate_timeout(60, "request to arrive"):
if self.otlp.requests:
if len(self.otlp.requests) == 3:
break
req = self.otlp.requests[0]
self.log.debug("Received:\n%s", req)
attrs = attributes_to_dict(req.resource_spans[0].resource.attributes)
req1 = self.otlp.requests[0]
self.log.debug("Received:\n%s", req1)
attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
self.assertEqual("zuul",
req.resource_spans[0].scope_spans[0].scope.name)
span = req.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("test-trace", span.name)
req1.resource_spans[0].scope_spans[0].scope.name)
span1 = req1.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child1-trace", span1.name)
req2 = self.otlp.requests[1]
self.log.debug("Received:\n%s", req2)
span2 = req2.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child2-trace", span2.name)
self.assertEqual(span2.links[0].span_id, span1.span_id)
attrs = attributes_to_dict(span2.links[0].attributes)
self.assertEqual({"relationship": "prev"}, attrs)
req3 = self.otlp.requests[2]
self.log.debug("Received:\n%s", req3)
span3 = req3.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child3-trace", span3.name)
self.assertNotEqual(span1.trace_id, span2.trace_id)
self.assertNotEqual(span2.trace_id, span3.trace_id)
self.assertNotEqual(span1.trace_id, span3.trace_id)
def test_tracing(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
for _ in iterate_timeout(60, "request to arrive"):
if len(self.otlp.requests) >= 2:
break
buildset = self.getSpan('BuildSet')
self.log.debug("Received:\n%s", buildset)
item = self.getSpan('QueueItem')
self.log.debug("Received:\n%s", item)
self.assertEqual(item.trace_id, buildset.trace_id)
self.assertNotEqual(item.span_id, buildset.span_id)
self.assertTrue(buildset.start_time_unix_nano >=
item.start_time_unix_nano)
self.assertTrue(buildset.end_time_unix_nano <=
item.end_time_unix_nano)
item_attrs = attributes_to_dict(item.attributes)
self.assertTrue(item_attrs['ref_number'] == "1")
self.assertTrue(item_attrs['ref_patchset'] == "1")
self.assertTrue('zuul_event_id' in item_attrs)
def getSpan(self, name):
for req in self.otlp.requests:
span = req.resource_spans[0].scope_spans[0].spans[0]
if span.name == name:
return span

@ -18,12 +18,203 @@ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import \
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
OTLPSpanExporter as HTTPExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace import TracerProvider, Span
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry import trace
from opentelemetry.sdk import trace as trace_sdk
from zuul.lib.config import get_default, any_to_bool
class ZuulSpan(Span):
"""An implementation of Span which accepts floating point
times and converts them to the expected nanoseconds."""
def start(self, start_time=None, parent_context=None):
if isinstance(start_time, float):
start_time = int(start_time * (10**9))
return super().start(start_time, parent_context)
def end(self, end_time=None):
if isinstance(end_time, float):
end_time = int(end_time * (10**9))
return super().end(end_time)
# Patch the OpenTelemetry SDK Span class to return a ZuulSpan so that
# we can supply floating point timestamps.
trace_sdk._Span = ZuulSpan
def _formatContext(context):
return {
'trace_id': context.trace_id,
'span_id': context.span_id,
}
def _formatAttributes(attrs):
if attrs is None:
return None
return attrs.copy()
def getSpanInfo(span, include_attributes=False):
"""Return a dict for use in serializing a Span."""
links = [{'context': _formatContext(l.context),
'attributes': _formatAttributes(l.attributes)}
for l in span.links]
attrs = _formatAttributes(span.attributes)
context = span.get_span_context()
ret = {
'name': span.name,
'trace_id': context.trace_id,
'span_id': context.span_id,
'trace_flags': context.trace_flags,
'start_time': span.start_time,
}
if links:
ret['links'] = links
if attrs:
if not include_attributes:
# Avoid setting attributes when we start saved spans
# because we have to store them in ZooKeeper and we should
# minimize what we store there (especially since it is
# usually duplicative). If you really need to set
# attributes at the start of a span (because the info is
# not available later), set include_attributes to True.
# Otherwise, we raise an error here to remind ourselves to
# avoid that programming pattern.
raise RuntimeError("Attributes were set on a saved span; "
"either set them when ending the span, "
"or set include_attributes=True")
ret['attributes'] = attrs
return ret
def restoreSpan(span_info, is_remote=True):
"""Restore a Span from the serialized dict provided by getSpanInfo
Return None if unable to serialize the span.
"""
tracer = trace.get_tracer("zuul")
if span_info is None:
return trace.INVALID_SPAN
required_keys = {'name', 'trace_id', 'span_id', 'trace_flags'}
if not required_keys <= set(span_info.keys()):
return trace.INVALID_SPAN
span_context = trace.SpanContext(
span_info['trace_id'],
span_info['span_id'],
is_remote=is_remote,
trace_flags=trace.TraceFlags(span_info['trace_flags']),
)
links = []
for link_info in span_info.get('links', []):
link_context = trace.SpanContext(
link_info['context']['trace_id'],
link_info['context']['span_id'])
link = trace.Link(link_context, link_info['attributes'])
links.append(link)
attributes = span_info.get('attributes', {})
span = ZuulSpan(
name=span_info['name'],
context=span_context,
parent=None,
sampler=tracer.sampler,
resource=tracer.resource,
attributes=attributes,
span_processor=tracer.span_processor,
kind=trace.SpanKind.INTERNAL,
links=links,
instrumentation_info=tracer.instrumentation_info,
record_exception=False,
set_status_on_exception=True,
limits=tracer._span_limits,
instrumentation_scope=tracer._instrumentation_scope,
)
span.start(start_time=span_info['start_time'])
return span
def startSavedSpan(*args, **kw):
"""Start a span and serialize it
This is a convenience method which starts a span (either root
or child) and immediately serializes it.
Most spans in Zuul should use this method.
"""
tracer = trace.get_tracer("zuul")
include_attributes = kw.pop('include_attributes', False)
span = tracer.start_span(*args, **kw)
return getSpanInfo(span, include_attributes)
def endSavedSpan(span_info, end_time=None, attributes=None):
"""End a saved span.
This is a convenience method to restore a saved span and
immediately end it.
Most spans in Zuul should use this method.
"""
span = restoreSpan(span_info, is_remote=False)
if span:
if attributes:
span.set_attributes(attributes)
span.end(end_time=end_time)
def getSpanContext(span):
"""Return a dict for use in serializing a Span Context.
The span context information used here is a lightweight
encoding of the span information so that remote child spans
can be started without access to a fully restored parent.
This is equivalent to (but not the same format) as the
OpenTelemetry trace context propogator.
"""
context = span.get_span_context()
return {
'trace_id': context.trace_id,
'span_id': context.span_id,
'trace_flags': context.trace_flags,
}
def restoreSpanContext(span_context):
"""Return a span with remote context information from getSpanContext.
This returns a non-recording span to use as a parent span. It
avoids the necessity of fully restoring the parent span.
"""
if span_context:
span_context = trace.SpanContext(
trace_id=span_context['trace_id'],
span_id=span_context['span_id'],
is_remote=True,
trace_flags=trace.TraceFlags(span_context['trace_flags'])
)
else:
span_context = trace.INVALID_SPAN_CONTEXT
parent = trace.NonRecordingSpan(span_context)
return parent
def startSpanInContext(span_context, *args, **kw):
"""Start a span in a saved context.
This restores a span from a saved context and starts a new child span.
"""
tracer = trace.get_tracer("zuul")
parent = restoreSpanContext(span_context)
with trace.use_span(parent):
return tracer.start_span(*args, **kw)
class Tracing:
PROTOCOL_GRPC = 'grpc'
PROTOCOL_HTTP_PROTOBUF = 'http/protobuf'
@ -33,10 +224,10 @@ class Tracing:
service_name = get_default(config, "tracing", "service_name", "zuul")
resource = Resource(attributes={SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
enabled = get_default(config, "tracing", "enabled")
if not any_to_bool(enabled):
self.processor = None
self.tracer = provider.get_tracer("zuul")
return
protocol = get_default(config, "tracing", "protocol",
@ -93,16 +284,8 @@ class Tracing:
raise Exception(f"Unknown tracing protocol {protocol}")
self.processor = self.processor_class(exporter)
provider.add_span_processor(self.processor)
self.tracer = provider.get_tracer("zuul")
def stop(self):
if not self.processor:
return
self.processor.shutdown()
def test(self):
# TODO: remove once we have actual traces
if not self.tracer:
return
with self.tracer.start_as_current_span('test-trace'):
pass

@ -22,6 +22,7 @@ from zuul import model
from zuul.lib.dependson import find_dependency_headers
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.tarjan import strongly_connected_components
import zuul.lib.tracing as tracing
from zuul.model import (
Change, DequeueEvent, PipelineState, PipelineChangeList, QueueItem,
PipelinePostConfigEvent,
@ -30,6 +31,8 @@ from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
from zuul.zk.locks import pipeline_lock
from opentelemetry import trace
class DynamicChangeQueueContextManager(object):
def __init__(self, change_queue):
@ -579,7 +582,13 @@ class PipelineManager(metaclass=ABCMeta):
log.info("Adding change %s to queue %s in %s" %
(change, change_queue, self.pipeline))
item = change_queue.enqueueChange(change, event)
if enqueue_time is None:
enqueue_time = time.time()
span_info = tracing.startSavedSpan(
'QueueItem', start_time=enqueue_time)
item = change_queue.enqueueChange(change, event,
span_info=span_info,
enqueue_time=enqueue_time)
self.updateBundle(item, change_queue, cycle)
with item.activeContext(self.current_context):
@ -739,6 +748,15 @@ class PipelineManager(metaclass=ABCMeta):
self.reportDequeue(item)
item.queue.dequeueItem(item)
span_attrs = {
'zuul_event_id': item.event.zuul_event_id,
}
for k, v in item.change.getSafeAttributes().toDict().items():
span_attrs['ref_' + k] = v
tracing.endSavedSpan(item.current_build_set.span_info)
tracing.endSavedSpan(item.span_info,
attributes=span_attrs)
def removeItem(self, item):
log = get_annotated_logger(self.log, item.event)
# Remove an item from the queue, probably because it has been
@ -964,6 +982,7 @@ class PipelineManager(metaclass=ABCMeta):
self.reportNormalBuildsetEnd(
item.current_build_set, 'dequeue', final=False,
result='DEQUEUED')
tracing.endSavedSpan(item.current_build_set.span_info)
item.resetAllBuilds()
for item_behind in item.items_behind:
@ -1337,7 +1356,9 @@ class PipelineManager(metaclass=ABCMeta):
# isn't already set.
tpc = tenant.project_configs.get(item.change.project.canonical_name)
if not build_set.ref:
build_set.setConfiguration(self.current_context)
with trace.use_span(tracing.restoreSpan(item.span_info)):
span_info = tracing.startSavedSpan('BuildSet')
build_set.setConfiguration(self.current_context, span_info)
# Next, if a change ahead has a broken config, then so does
# this one. Record that and don't do anything else.

@ -329,6 +329,9 @@ class Attributes(object):
def __init__(self, **kw):
setattr(self, '__dict__', kw)
def toDict(self):
return self.__dict__
class Freezable(object):
"""A mix-in class so that an object can be made immutable"""
@ -1094,13 +1097,16 @@ class ChangeQueue(zkobject.ZKObject):
def matches(self, project_cname, branch):
return (project_cname, branch) in self.project_branches
def enqueueChange(self, change, event):
def enqueueChange(self, change, event, span_info=None, enqueue_time=None):
if enqueue_time is None:
enqueue_time = time.time()
item = QueueItem.new(self.zk_context,
queue=self,
pipeline=self.pipeline,
change=change,
event=event,
enqueue_time=time.time())
span_info=span_info,
enqueue_time=enqueue_time)
self.enqueueItem(item)
return item
@ -3882,6 +3888,7 @@ class BuildSet(zkobject.ZKObject):
tries={},
files_state=self.NEW,
repo_state_state=self.NEW,
span_info=None,
configured=False,
configured_time=None, # When setConfigured was called
start_time=None, # When the buildset reported start
@ -3997,6 +4004,7 @@ class BuildSet(zkobject.ZKObject):
"fail_fast": self.fail_fast,
"job_graph": (self.job_graph.toDict()
if self.job_graph else None),
"span_info": self.span_info,
"configured_time": self.configured_time,
"start_time": self.start_time,
"repo_state_request_time": self.repo_state_request_time,
@ -4147,7 +4155,7 @@ class BuildSet(zkobject.ZKObject):
len(self.builds),
self.getStateName(self.merge_state))
def setConfiguration(self, context):
def setConfiguration(self, context, span_info):
with self.activeContext(context):
# The change isn't enqueued until after it's created
# so we don't know what the other changes ahead will be
@ -4167,6 +4175,7 @@ class BuildSet(zkobject.ZKObject):
self.merger_items = [i.makeMergerItem() for i in items]
self.configured = True
self.configured_time = time.time()
self.span_info = span_info
def _toChangeDict(self, item):
# Inject bundle_id to dict if available, this can be used to decide
@ -4336,6 +4345,7 @@ class QueueItem(zkobject.ZKObject):
current_build_set=None,
item_ahead=None,
items_behind=[],
span_info=None,
enqueue_time=None,
report_time=None,
dequeue_time=None,
@ -4405,6 +4415,7 @@ class QueueItem(zkobject.ZKObject):
self.current_build_set.getPath()),
"item_ahead": self.item_ahead and self.item_ahead.getPath(),
"items_behind": [i.getPath() for i in self.items_behind],
"span_info": self.span_info,
"enqueue_time": self.enqueue_time,
"report_time": self.report_time,
"dequeue_time": self.dequeue_time,

Loading…
Cancel
Save