Merge "Add support for configuring and testing tracing"

This commit is contained in:
Zuul 2022-09-22 22:36:22 +00:00 committed by Gerrit Code Review
commit 6fa84faf3f
10 changed files with 365 additions and 0 deletions

View File

@ -12,5 +12,6 @@ Service Administration
operation
authentication
monitoring
tracing
client
troubleshooting

View File

@ -61,6 +61,81 @@ Statsd
If present, this will be prefixed to all of the keys before
transmitting to the statsd server.
Tracing
~~~~~~~
.. attr:: tracing
Information about the optional OpenTelemetry tracing configuration.
See :ref:`tracing` for more information.
.. attr:: enabled
:required:
To enable tracing, set this value to ``true``. This is the only
required parameter in order to export to a collector running
locally.
.. attr:: protocol
:default: grpc
The OTLP wire protocol to use.
.. value:: grpc
Use gRPC (the default).
.. value:: http/protobuf
Use HTTP with protobuf encoding.
.. attr:: endpoint
The endpoint to use. The default is protocol specific, but
defaults to localhost in all cases.
.. attr:: service_name
:default: zuul
The service name may be specified here. Multiple Zuul
installations should use different values.
.. attr:: tls_cert
The path to the PEM encoded certificate file. Used only by
:value:`tracing.protocol.grpc`.
.. attr:: tls_key
The path to the PEM encoded key file. Used only by
:value:`tracing.protocol.grpc`.
.. attr:: tls_ca
The path to the PEM encoded CA certificate file. Used only by
:value:`tracing.protocol.grpc`.
.. attr:: certificate_file
The path to the PEM encoded certificate file used to verify the
endpoint. Used only by :value:`tracing.protocol.http/protobuf`.
.. attr:: insecure
Whether to allow an insecure connection. Used only by
:value:`tracing.protocol.grpc`.
.. attr:: timeout
:default: 10000
The timeout for outgoing data in milliseconds.
.. attr:: compression
The compression algorithm to use. Available values depend on
the protocol and endpoint. The only universally supported value
is ``gzip``.
ZooKeeper
~~~~~~~~~

23
doc/source/tracing.rst Normal file
View File

@ -0,0 +1,23 @@
:title: Tracing
.. _tracing:
Tracing
=======
Zuul includes support for distributed `tracing`_ as described by the
OpenTelemetry project. This allows operators (and potentially users)
to visualize the progress of events and queue items through the
various Zuul components as an aid to debugging.
OpenTelemetry defines several observability signals such as traces,
metrics, and logs. Zuul uses other systems for metrics and logs; only
traces are exported via OpenTelemetry.
Zuul supports the OpenTelemetry Protocol (OTLP) for exporting traces.
Many observability systems support receiving traces via OTLP
(including Jaeger tracing).
Related configuration is in the :attr:`tracing` section of ``zuul.conf``.
_`distributed tracing`: https://opentelemetry.io/docs/concepts/observability-primer/#distributed-traces

View File

@ -37,3 +37,6 @@ cheroot!=8.1.*,!=8.2.*,!=8.3.0 # https://github.com/cherrypy/cheroot/issues/263
elasticsearch<8.0.0
PyMySQL
psycopg2-binary
opentelemetry-sdk
opentelemetry-exporter-otlp-proto-grpc
opentelemetry-exporter-otlp-proto-http

View File

@ -125,6 +125,8 @@ from zuul.lib.logutil import get_annotated_logger
import tests.fakegithub
import tests.fakegitlab
from tests.otlp_fixture import OTLPFixture
import opentelemetry.sdk.trace.export
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures')
@ -4921,6 +4923,15 @@ class ZuulTestCase(BaseTestCase):
if 'database' in config.sections():
_setup_fixture(config, 'database')
if 'tracing' in config.sections():
self.otlp = OTLPFixture()
self.useFixture(self.otlp)
self.useFixture(fixtures.MonkeyPatch(
'zuul.lib.tracing.Tracing.processor_class',
opentelemetry.sdk.trace.export.SimpleSpanProcessor))
config.set('tracing', 'endpoint',
f'http://localhost:{self.otlp.port}')
if not self.setupSimpleLayout(config):
tenant_config = None
for cfg_attr in ('tenant_config', 'tenant_config_script'):
@ -5207,6 +5218,7 @@ class ZuulTestCase(BaseTestCase):
and not t.name.startswith('Dummy-')
and not t.name.startswith('pydevd.')
and not t.name.startswith('ptvsd.')
and not t.name.startswith('OTLPFixture_')
]
if len(threads) > 1:
thread_map = dict(map(lambda x: (x.ident, x.name),

42
tests/fixtures/zuul-tracing.conf vendored Normal file
View File

@ -0,0 +1,42 @@
[statsd]
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
# see: https://github.com/jsocol/pystatsd/issues/61
server=127.0.0.1
[scheduler]
tenant_config=main.yaml
relative_priority=true
[merger]
git_dir=/tmp/zuul-test/merger-git
git_user_email=zuul@example.com
git_user_name=zuul
[executor]
git_dir=/tmp/zuul-test/executor-git
load_multiplier=100
[connection gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=fake_id_rsa_path
[connection smtp]
driver=smtp
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com
[database]
dburi=$MYSQL_FIXTURE_DBURI$
[web]
static_cache_expiry=1200
root=https://zuul.example.com/
[tracing]
enabled=true
endpoint=http://localhost:port
service_name=zuultest

55
tests/otlp_fixture.py Normal file
View File

@ -0,0 +1,55 @@
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from concurrent import futures
import fixtures
import grpc
from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import (
TraceServiceServicer,
add_TraceServiceServicer_to_server
)
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
ExportTraceServiceResponse,
)
class TraceServer(TraceServiceServicer):
def __init__(self, fixture):
super().__init__()
self.fixture = fixture
def Export(self, request, context):
self.fixture.requests.append(request)
return ExportTraceServiceResponse()
class OTLPFixture(fixtures.Fixture):
def __init__(self):
super().__init__()
self.requests = []
self.executor = futures.ThreadPoolExecutor(
thread_name_prefix='OTLPFixture',
max_workers=10)
self.server = grpc.server(self.executor)
add_TraceServiceServicer_to_server(TraceServer(self), self.server)
self.port = self.server.add_insecure_port('[::]:0')
def _setUp(self):
self.server.start()
def _cleanup(self):
self.server.stop()
self.server.wait_for_termination()
self.executor.shutdown()

View File

@ -0,0 +1,41 @@
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tests.base import iterate_timeout, ZuulTestCase
def attributes_to_dict(attrlist):
ret = {}
for attr in attrlist:
ret[attr.key] = attr.value.string_value
return ret
class TestTracing(ZuulTestCase):
config_file = 'zuul-tracing.conf'
tenant_config_file = "config/single-tenant/main.yaml"
def test_tracing(self):
self.scheds.first.sched.tracing.test()
for _ in iterate_timeout(60, "request to arrive"):
if self.otlp.requests:
break
req = self.otlp.requests[0]
self.log.debug("Received:\n%s", req)
attrs = attributes_to_dict(req.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
self.assertEqual("zuul",
req.resource_spans[0].scope_spans[0].scope.name)
span = req.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("test-trace", span.name)

108
zuul/lib/tracing.py Normal file
View File

@ -0,0 +1,108 @@
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import grpc
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import \
OTLPSpanExporter as GRPCExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
OTLPSpanExporter as HTTPExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from zuul.lib.config import get_default, any_to_bool
class Tracing:
PROTOCOL_GRPC = 'grpc'
PROTOCOL_HTTP_PROTOBUF = 'http/protobuf'
processor_class = BatchSpanProcessor
def __init__(self, config):
service_name = get_default(config, "tracing", "service_name", "zuul")
resource = Resource(attributes={SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)
enabled = get_default(config, "tracing", "enabled")
if not any_to_bool(enabled):
self.processor = None
self.tracer = provider.get_tracer("zuul")
return
protocol = get_default(config, "tracing", "protocol",
self.PROTOCOL_GRPC)
endpoint = get_default(config, "tracing", "endpoint")
tls_key = get_default(config, "tracing", "tls_key")
tls_cert = get_default(config, "tracing", "tls_cert")
tls_ca = get_default(config, "tracing", "tls_ca")
certificate_file = get_default(config, "tracing", "certificate_file")
insecure = get_default(config, "tracing", "insecure")
if insecure is not None:
insecure = any_to_bool(insecure)
timeout = get_default(config, "tracing", "timeout")
if timeout is not None:
timeout = int(timeout)
compression = get_default(config, "tracing", "compression")
if protocol == self.PROTOCOL_GRPC:
if certificate_file:
raise Exception("The certificate_file tracing option "
f"is not valid for {protocol} endpoints")
if any([tls_ca, tls_key, tls_cert]):
if tls_ca:
tls_ca = open(tls_ca, 'rb').read()
if tls_key:
tls_key = open(tls_key, 'rb').read()
if tls_cert:
tls_cert = open(tls_cert, 'rb').read()
creds = grpc.ssl_channel_credentials(
root_certificates=tls_ca,
private_key=tls_key,
certificate_chain=tls_cert)
else:
creds = None
exporter = GRPCExporter(
endpoint=endpoint,
insecure=insecure,
credentials=creds,
timeout=timeout,
compression=compression)
elif protocol == self.PROTOCOL_HTTP_PROTOBUF:
if insecure:
raise Exception("The insecure tracing option "
f"is not valid for {protocol} endpoints")
if any([tls_ca, tls_key, tls_cert]):
raise Exception("The tls_* tracing options "
f"are not valid for {protocol} endpoints")
exporter = HTTPExporter(
endpoint=endpoint,
certificate_file=certificate_file,
timeout=timeout,
compression=compression)
else:
raise Exception(f"Unknown tracing protocol {protocol}")
self.processor = self.processor_class(exporter)
provider.add_span_processor(self.processor)
self.tracer = provider.get_tracer("zuul")
def stop(self):
if not self.processor:
return
self.processor.shutdown()
def test(self):
# TODO: remove once we have actual traces
if not self.tracer:
return
with self.tracer.start_as_current_span('test-trace'):
pass

View File

@ -42,6 +42,7 @@ from zuul.lib.monitoring import MonitoringServer
from zuul.lib.queue import NamedQueue
from zuul.lib.times import Times
from zuul.lib.statsd import get_statsd, normalize_statsd_name
from zuul.lib.tracing import Tracing
import zuul.lib.queue
import zuul.lib.repl
from zuul import nodepool
@ -190,6 +191,7 @@ class Scheduler(threading.Thread):
self.daemon = True
self.wait_for_init = wait_for_init
self.hostname = socket.getfqdn()
self.tracing = Tracing(config)
self.primed_event = threading.Event()
# Wake up the main run loop
self.wake_event = threading.Event()
@ -383,7 +385,10 @@ class Scheduler(threading.Thread):
self.log.debug("Stopping monitoring server")
self.monitoring_server.stop()
self.monitoring_server.join()
self.log.debug("Disconnecting from ZooKeeper")
self.zk_client.disconnect()
self.log.debug("Stopping tracing")
self.tracing.stop()
def runCommand(self):
while self._command_running: