diff --git a/tests/otlp_fixture.py b/tests/otlp_fixture.py index cd23294837..633296facd 100644 --- a/tests/otlp_fixture.py +++ b/tests/otlp_fixture.py @@ -11,11 +11,11 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - from concurrent import futures import fixtures import grpc +from opentelemetry import trace from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import ( TraceServiceServicer, add_TraceServiceServicer_to_server @@ -45,6 +45,9 @@ class OTLPFixture(fixtures.Fixture): self.server = grpc.server(self.executor) add_TraceServiceServicer_to_server(TraceServer(self), self.server) self.port = self.server.add_insecure_port('[::]:0') + # Reset global tracer provider + trace._TRACER_PROVIDER_SET_ONCE = trace.Once() + trace._TRACER_PROVIDER = None def _setUp(self): self.server.start() diff --git a/tests/unit/test_tracing.py b/tests/unit/test_tracing.py index ed64c8a7cb..3c452dd53b 100644 --- a/tests/unit/test_tracing.py +++ b/tests/unit/test_tracing.py @@ -12,8 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. +import time + from tests.base import iterate_timeout, ZuulTestCase +import zuul.lib.tracing as tracing +from opentelemetry import trace + def attributes_to_dict(attrlist): ret = {} @@ -26,16 +31,169 @@ class TestTracing(ZuulTestCase): config_file = 'zuul-tracing.conf' tenant_config_file = "config/single-tenant/main.yaml" - def test_tracing(self): - self.scheds.first.sched.tracing.test() + def test_tracing_api(self): + tracer = trace.get_tracer("zuul") + + # We have a lot of timestamps stored as floats, so make sure + # our root span is a ZuulSpan that can handle that input. + span_info = tracing.startSavedSpan('parent-trace', + start_time=time.time(), + attributes={'startattr': 'bar'}, + include_attributes=True) + + # Simulate a reconstructed root span + span = tracing.restoreSpan(span_info) + + # Within the root span, use the more typical OpenTelemetry + # context manager api. + with trace.use_span(span): + with tracer.start_span('child1-trace') as child1_span: + link = trace.Link(child1_span.context, + attributes={'relationship': 'prev'}) + + # Make sure that we can manually start and stop a child span, + # and that it is a ZuulSpan as well. + with trace.use_span(span): + child = tracer.start_span('child2-trace', start_time=time.time(), + links=[link]) + child.end(end_time=time.time()) + + # Make sure that we can start a child span from a span + # context and not a full span: + span_context = tracing.getSpanContext(span) + with tracing.startSpanInContext(span_context, 'child3-trace') as child: + child.end(end_time=time.time()) + + # End our root span manually. + tracing.endSavedSpan(span_info, end_time=time.time(), + attributes={'endattr': 'baz'}) + for _ in iterate_timeout(60, "request to arrive"): - if self.otlp.requests: + if len(self.otlp.requests) == 4: break - req = self.otlp.requests[0] - self.log.debug("Received:\n%s", req) - attrs = attributes_to_dict(req.resource_spans[0].resource.attributes) + req1 = self.otlp.requests[0] + self.log.debug("Received:\n%s", req1) + attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes) self.assertEqual({"service.name": "zuultest"}, attrs) self.assertEqual("zuul", - req.resource_spans[0].scope_spans[0].scope.name) - span = req.resource_spans[0].scope_spans[0].spans[0] - self.assertEqual("test-trace", span.name) + req1.resource_spans[0].scope_spans[0].scope.name) + span1 = req1.resource_spans[0].scope_spans[0].spans[0] + self.assertEqual("child1-trace", span1.name) + + req2 = self.otlp.requests[1] + self.log.debug("Received:\n%s", req2) + span2 = req2.resource_spans[0].scope_spans[0].spans[0] + self.assertEqual("child2-trace", span2.name) + self.assertEqual(span2.links[0].span_id, span1.span_id) + attrs = attributes_to_dict(span2.links[0].attributes) + self.assertEqual({"relationship": "prev"}, attrs) + + req3 = self.otlp.requests[2] + self.log.debug("Received:\n%s", req3) + span3 = req3.resource_spans[0].scope_spans[0].spans[0] + self.assertEqual("child3-trace", span3.name) + + req4 = self.otlp.requests[3] + self.log.debug("Received:\n%s", req4) + span4 = req4.resource_spans[0].scope_spans[0].spans[0] + self.assertEqual("parent-trace", span4.name) + attrs = attributes_to_dict(span4.attributes) + self.assertEqual({"startattr": "bar", + "endattr": "baz"}, attrs) + + self.assertEqual(span1.trace_id, span4.trace_id) + self.assertEqual(span2.trace_id, span4.trace_id) + self.assertEqual(span3.trace_id, span4.trace_id) + + def test_tracing_api_null(self): + tracer = trace.get_tracer("zuul") + + # Test that restoring spans and span contexts works with + # null values. + + span_info = None + # Simulate a reconstructed root span from a null value + span = tracing.restoreSpan(span_info) + + # Within the root span, use the more typical OpenTelemetry + # context manager api. + with trace.use_span(span): + with tracer.start_span('child1-trace') as child1_span: + link = trace.Link(child1_span.context, + attributes={'relationship': 'prev'}) + + # Make sure that we can manually start and stop a child span, + # and that it is a ZuulSpan as well. + with trace.use_span(span): + child = tracer.start_span('child2-trace', start_time=time.time(), + links=[link]) + child.end(end_time=time.time()) + + # Make sure that we can start a child span from a null span + # context: + span_context = None + with tracing.startSpanInContext(span_context, 'child3-trace') as child: + child.end(end_time=time.time()) + + # End our root span manually. + span.end(end_time=time.time()) + + for _ in iterate_timeout(60, "request to arrive"): + if len(self.otlp.requests) == 3: + break + req1 = self.otlp.requests[0] + self.log.debug("Received:\n%s", req1) + attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes) + self.assertEqual({"service.name": "zuultest"}, attrs) + self.assertEqual("zuul", + req1.resource_spans[0].scope_spans[0].scope.name) + span1 = req1.resource_spans[0].scope_spans[0].spans[0] + self.assertEqual("child1-trace", span1.name) + + req2 = self.otlp.requests[1] + self.log.debug("Received:\n%s", req2) + span2 = req2.resource_spans[0].scope_spans[0].spans[0] + self.assertEqual("child2-trace", span2.name) + self.assertEqual(span2.links[0].span_id, span1.span_id) + attrs = attributes_to_dict(span2.links[0].attributes) + self.assertEqual({"relationship": "prev"}, attrs) + + req3 = self.otlp.requests[2] + self.log.debug("Received:\n%s", req3) + span3 = req3.resource_spans[0].scope_spans[0].spans[0] + self.assertEqual("child3-trace", span3.name) + + self.assertNotEqual(span1.trace_id, span2.trace_id) + self.assertNotEqual(span2.trace_id, span3.trace_id) + self.assertNotEqual(span1.trace_id, span3.trace_id) + + def test_tracing(self): + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addApproval('Code-Review', 2) + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.waitUntilSettled() + + for _ in iterate_timeout(60, "request to arrive"): + if len(self.otlp.requests) >= 2: + break + + buildset = self.getSpan('BuildSet') + self.log.debug("Received:\n%s", buildset) + item = self.getSpan('QueueItem') + self.log.debug("Received:\n%s", item) + self.assertEqual(item.trace_id, buildset.trace_id) + self.assertNotEqual(item.span_id, buildset.span_id) + self.assertTrue(buildset.start_time_unix_nano >= + item.start_time_unix_nano) + self.assertTrue(buildset.end_time_unix_nano <= + item.end_time_unix_nano) + item_attrs = attributes_to_dict(item.attributes) + self.assertTrue(item_attrs['ref_number'] == "1") + self.assertTrue(item_attrs['ref_patchset'] == "1") + self.assertTrue('zuul_event_id' in item_attrs) + + def getSpan(self, name): + for req in self.otlp.requests: + span = req.resource_spans[0].scope_spans[0].spans[0] + if span.name == name: + return span diff --git a/zuul/lib/tracing.py b/zuul/lib/tracing.py index 2eb4d8903c..42b2681f36 100644 --- a/zuul/lib/tracing.py +++ b/zuul/lib/tracing.py @@ -18,12 +18,203 @@ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import \ from opentelemetry.exporter.otlp.proto.http.trace_exporter import \ OTLPSpanExporter as HTTPExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace import TracerProvider, Span from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry import trace +from opentelemetry.sdk import trace as trace_sdk from zuul.lib.config import get_default, any_to_bool +class ZuulSpan(Span): + """An implementation of Span which accepts floating point + times and converts them to the expected nanoseconds.""" + + def start(self, start_time=None, parent_context=None): + if isinstance(start_time, float): + start_time = int(start_time * (10**9)) + return super().start(start_time, parent_context) + + def end(self, end_time=None): + if isinstance(end_time, float): + end_time = int(end_time * (10**9)) + return super().end(end_time) + + +# Patch the OpenTelemetry SDK Span class to return a ZuulSpan so that +# we can supply floating point timestamps. +trace_sdk._Span = ZuulSpan + + +def _formatContext(context): + return { + 'trace_id': context.trace_id, + 'span_id': context.span_id, + } + + +def _formatAttributes(attrs): + if attrs is None: + return None + return attrs.copy() + + +def getSpanInfo(span, include_attributes=False): + """Return a dict for use in serializing a Span.""" + links = [{'context': _formatContext(l.context), + 'attributes': _formatAttributes(l.attributes)} + for l in span.links] + attrs = _formatAttributes(span.attributes) + context = span.get_span_context() + ret = { + 'name': span.name, + 'trace_id': context.trace_id, + 'span_id': context.span_id, + 'trace_flags': context.trace_flags, + 'start_time': span.start_time, + } + if links: + ret['links'] = links + if attrs: + if not include_attributes: + # Avoid setting attributes when we start saved spans + # because we have to store them in ZooKeeper and we should + # minimize what we store there (especially since it is + # usually duplicative). If you really need to set + # attributes at the start of a span (because the info is + # not available later), set include_attributes to True. + # Otherwise, we raise an error here to remind ourselves to + # avoid that programming pattern. + raise RuntimeError("Attributes were set on a saved span; " + "either set them when ending the span, " + "or set include_attributes=True") + ret['attributes'] = attrs + return ret + + +def restoreSpan(span_info, is_remote=True): + """Restore a Span from the serialized dict provided by getSpanInfo + + Return None if unable to serialize the span. + """ + tracer = trace.get_tracer("zuul") + if span_info is None: + return trace.INVALID_SPAN + required_keys = {'name', 'trace_id', 'span_id', 'trace_flags'} + if not required_keys <= set(span_info.keys()): + return trace.INVALID_SPAN + span_context = trace.SpanContext( + span_info['trace_id'], + span_info['span_id'], + is_remote=is_remote, + trace_flags=trace.TraceFlags(span_info['trace_flags']), + ) + links = [] + for link_info in span_info.get('links', []): + link_context = trace.SpanContext( + link_info['context']['trace_id'], + link_info['context']['span_id']) + link = trace.Link(link_context, link_info['attributes']) + links.append(link) + attributes = span_info.get('attributes', {}) + + span = ZuulSpan( + name=span_info['name'], + context=span_context, + parent=None, + sampler=tracer.sampler, + resource=tracer.resource, + attributes=attributes, + span_processor=tracer.span_processor, + kind=trace.SpanKind.INTERNAL, + links=links, + instrumentation_info=tracer.instrumentation_info, + record_exception=False, + set_status_on_exception=True, + limits=tracer._span_limits, + instrumentation_scope=tracer._instrumentation_scope, + ) + span.start(start_time=span_info['start_time']) + + return span + + +def startSavedSpan(*args, **kw): + """Start a span and serialize it + + This is a convenience method which starts a span (either root + or child) and immediately serializes it. + + Most spans in Zuul should use this method. + """ + tracer = trace.get_tracer("zuul") + include_attributes = kw.pop('include_attributes', False) + span = tracer.start_span(*args, **kw) + return getSpanInfo(span, include_attributes) + + +def endSavedSpan(span_info, end_time=None, attributes=None): + """End a saved span. + + This is a convenience method to restore a saved span and + immediately end it. + + Most spans in Zuul should use this method. + """ + span = restoreSpan(span_info, is_remote=False) + if span: + if attributes: + span.set_attributes(attributes) + span.end(end_time=end_time) + + +def getSpanContext(span): + """Return a dict for use in serializing a Span Context. + + The span context information used here is a lightweight + encoding of the span information so that remote child spans + can be started without access to a fully restored parent. + This is equivalent to (but not the same format) as the + OpenTelemetry trace context propogator. + """ + context = span.get_span_context() + return { + 'trace_id': context.trace_id, + 'span_id': context.span_id, + 'trace_flags': context.trace_flags, + } + + +def restoreSpanContext(span_context): + """Return a span with remote context information from getSpanContext. + + This returns a non-recording span to use as a parent span. It + avoids the necessity of fully restoring the parent span. + """ + if span_context: + span_context = trace.SpanContext( + trace_id=span_context['trace_id'], + span_id=span_context['span_id'], + is_remote=True, + trace_flags=trace.TraceFlags(span_context['trace_flags']) + ) + else: + span_context = trace.INVALID_SPAN_CONTEXT + parent = trace.NonRecordingSpan(span_context) + return parent + + +def startSpanInContext(span_context, *args, **kw): + """Start a span in a saved context. + + This restores a span from a saved context and starts a new child span. + """ + tracer = trace.get_tracer("zuul") + parent = restoreSpanContext(span_context) + with trace.use_span(parent): + return tracer.start_span(*args, **kw) + + class Tracing: PROTOCOL_GRPC = 'grpc' PROTOCOL_HTTP_PROTOBUF = 'http/protobuf' @@ -33,10 +224,10 @@ class Tracing: service_name = get_default(config, "tracing", "service_name", "zuul") resource = Resource(attributes={SERVICE_NAME: service_name}) provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) enabled = get_default(config, "tracing", "enabled") if not any_to_bool(enabled): self.processor = None - self.tracer = provider.get_tracer("zuul") return protocol = get_default(config, "tracing", "protocol", @@ -93,16 +284,8 @@ class Tracing: raise Exception(f"Unknown tracing protocol {protocol}") self.processor = self.processor_class(exporter) provider.add_span_processor(self.processor) - self.tracer = provider.get_tracer("zuul") def stop(self): if not self.processor: return self.processor.shutdown() - - def test(self): - # TODO: remove once we have actual traces - if not self.tracer: - return - with self.tracer.start_as_current_span('test-trace'): - pass diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 365435f3db..cd9b2381db 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -22,6 +22,7 @@ from zuul import model from zuul.lib.dependson import find_dependency_headers from zuul.lib.logutil import get_annotated_logger from zuul.lib.tarjan import strongly_connected_components +import zuul.lib.tracing as tracing from zuul.model import ( Change, DequeueEvent, PipelineState, PipelineChangeList, QueueItem, PipelinePostConfigEvent, @@ -30,6 +31,8 @@ from zuul.zk.change_cache import ChangeKey from zuul.zk.components import COMPONENT_REGISTRY from zuul.zk.locks import pipeline_lock +from opentelemetry import trace + class DynamicChangeQueueContextManager(object): def __init__(self, change_queue): @@ -579,7 +582,13 @@ class PipelineManager(metaclass=ABCMeta): log.info("Adding change %s to queue %s in %s" % (change, change_queue, self.pipeline)) - item = change_queue.enqueueChange(change, event) + if enqueue_time is None: + enqueue_time = time.time() + span_info = tracing.startSavedSpan( + 'QueueItem', start_time=enqueue_time) + item = change_queue.enqueueChange(change, event, + span_info=span_info, + enqueue_time=enqueue_time) self.updateBundle(item, change_queue, cycle) with item.activeContext(self.current_context): @@ -739,6 +748,15 @@ class PipelineManager(metaclass=ABCMeta): self.reportDequeue(item) item.queue.dequeueItem(item) + span_attrs = { + 'zuul_event_id': item.event.zuul_event_id, + } + for k, v in item.change.getSafeAttributes().toDict().items(): + span_attrs['ref_' + k] = v + tracing.endSavedSpan(item.current_build_set.span_info) + tracing.endSavedSpan(item.span_info, + attributes=span_attrs) + def removeItem(self, item): log = get_annotated_logger(self.log, item.event) # Remove an item from the queue, probably because it has been @@ -964,6 +982,7 @@ class PipelineManager(metaclass=ABCMeta): self.reportNormalBuildsetEnd( item.current_build_set, 'dequeue', final=False, result='DEQUEUED') + tracing.endSavedSpan(item.current_build_set.span_info) item.resetAllBuilds() for item_behind in item.items_behind: @@ -1337,7 +1356,9 @@ class PipelineManager(metaclass=ABCMeta): # isn't already set. tpc = tenant.project_configs.get(item.change.project.canonical_name) if not build_set.ref: - build_set.setConfiguration(self.current_context) + with trace.use_span(tracing.restoreSpan(item.span_info)): + span_info = tracing.startSavedSpan('BuildSet') + build_set.setConfiguration(self.current_context, span_info) # Next, if a change ahead has a broken config, then so does # this one. Record that and don't do anything else. diff --git a/zuul/model.py b/zuul/model.py index 5aaa22a5f2..254556fda5 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -329,6 +329,9 @@ class Attributes(object): def __init__(self, **kw): setattr(self, '__dict__', kw) + def toDict(self): + return self.__dict__ + class Freezable(object): """A mix-in class so that an object can be made immutable""" @@ -1094,13 +1097,16 @@ class ChangeQueue(zkobject.ZKObject): def matches(self, project_cname, branch): return (project_cname, branch) in self.project_branches - def enqueueChange(self, change, event): + def enqueueChange(self, change, event, span_info=None, enqueue_time=None): + if enqueue_time is None: + enqueue_time = time.time() item = QueueItem.new(self.zk_context, queue=self, pipeline=self.pipeline, change=change, event=event, - enqueue_time=time.time()) + span_info=span_info, + enqueue_time=enqueue_time) self.enqueueItem(item) return item @@ -3882,6 +3888,7 @@ class BuildSet(zkobject.ZKObject): tries={}, files_state=self.NEW, repo_state_state=self.NEW, + span_info=None, configured=False, configured_time=None, # When setConfigured was called start_time=None, # When the buildset reported start @@ -3997,6 +4004,7 @@ class BuildSet(zkobject.ZKObject): "fail_fast": self.fail_fast, "job_graph": (self.job_graph.toDict() if self.job_graph else None), + "span_info": self.span_info, "configured_time": self.configured_time, "start_time": self.start_time, "repo_state_request_time": self.repo_state_request_time, @@ -4147,7 +4155,7 @@ class BuildSet(zkobject.ZKObject): len(self.builds), self.getStateName(self.merge_state)) - def setConfiguration(self, context): + def setConfiguration(self, context, span_info): with self.activeContext(context): # The change isn't enqueued until after it's created # so we don't know what the other changes ahead will be @@ -4167,6 +4175,7 @@ class BuildSet(zkobject.ZKObject): self.merger_items = [i.makeMergerItem() for i in items] self.configured = True self.configured_time = time.time() + self.span_info = span_info def _toChangeDict(self, item): # Inject bundle_id to dict if available, this can be used to decide @@ -4336,6 +4345,7 @@ class QueueItem(zkobject.ZKObject): current_build_set=None, item_ahead=None, items_behind=[], + span_info=None, enqueue_time=None, report_time=None, dequeue_time=None, @@ -4405,6 +4415,7 @@ class QueueItem(zkobject.ZKObject): self.current_build_set.getPath()), "item_ahead": self.item_ahead and self.item_ahead.getPath(), "items_behind": [i.getPath() for i in self.items_behind], + "span_info": self.span_info, "enqueue_time": self.enqueue_time, "report_time": self.report_time, "dequeue_time": self.dequeue_time,