add dimensional tags ("binary_annotations" in zipkin)

fix duration for annotations in the zipkin backend (not sure what it's good for, though)

move the span/tag/note types into their own file so that backends can import them
This commit is contained in:
Tim Daly, Jr
2013-05-07 20:45:18 +00:00
parent d100b555f8
commit 6804135fcc
4 changed files with 74 additions and 16 deletions

View File

@@ -18,8 +18,11 @@ import time
@tomograph.traced('test server', 'server response', port=80) @tomograph.traced('test server', 'server response', port=80)
def server(latency): def server(latency):
tomograph.annotate('this is an annotation')
time.sleep(latency) time.sleep(latency)
tomograph.tag('this is double', 1.1)
tomograph.tag('this is a string', 'foo')
tomograph.tag('this is an int', 42)
@tomograph.traced('test client', 'client request') @tomograph.traced('test client', 'client request')
def client(client_overhead, server_latency): def client(client_overhead, server_latency):

View File

@@ -48,15 +48,37 @@ def send(span):
def annotation(note): def annotation(note):
return zipkin_thrift.Annotation(timestamp = int(note.time * 1e6), return zipkin_thrift.Annotation(timestamp = int(note.time * 1e6),
value = note.value, value = note.value,
duration = note.duration,
host = endpoint(note)) host = endpoint(note))
def binary_annotation(dimension):
if isinstance(dimension.value, str):
tag_type = zipkin_thrift.AnnotationType.STRING
val = dimension.value
elif isinstance(dimension.value, float):
tag_type = zipkin_thrift.AnnotationType.DOUBLE
val = struct.pack('>d', dimension.value)
print "encoding double"
elif isinstance(dimension.value, int):
tag_type = zipkin_thrift.AnnotationType.I64
val = struct.pack('>q', dimension.value)
else:
raise RuntimeError("unsupported tag type")
return zipkin_thrift.BinaryAnnotation(key = dimension.key,
value = val,
annotation_type = tag_type,
host = endpoint(dimension))
zspan = zipkin_thrift.Span(trace_id = span.trace_id, zspan = zipkin_thrift.Span(trace_id = span.trace_id,
id = span.id, id = span.id,
name = span.name, name = span.name,
parent_id = span.parent_id, parent_id = span.parent_id,
annotations = [annotation(n) for n in span.notes]) annotations = [annotation(n) for n in span.notes],
binary_annotations = \
[binary_annotation(d) for d in span.dimensions])
out = StringIO.StringIO() out = StringIO.StringIO()
raw = TBinaryProtocol.TBinaryProtocolAccelerated(out) #raw = TBinaryProtocol.TBinaryProtocolAccelerated(out)
raw = TBinaryProtocol.TBinaryProtocol(out)
try: try:
zspan.write(raw) zspan.write(raw)
except OverflowError: except OverflowError:

View File

@@ -10,12 +10,12 @@
# limitations under the License. See accompanying LICENSE file. # limitations under the License. See accompanying LICENSE file.
import config import config
from types import Span, Note, Tag
import random import random
import sys import sys
import time import time
from eventlet import corolocal from eventlet import corolocal
from collections import namedtuple
import socket import socket
import pickle import pickle
import base64 import base64
@@ -24,9 +24,6 @@ import webob.dec
span_stack = corolocal.local() span_stack = corolocal.local()
Span = namedtuple('Span', 'trace_id parent_id id name notes')
Note = namedtuple('Note', 'time value service_name address port')
def start(service_name, name, address, port, trace_info=None): def start(service_name, name, address, port, trace_info=None):
parent_id = None parent_id = None
if hasattr(span_stack, 'trace_id'): if hasattr(span_stack, 'trace_id'):
@@ -40,7 +37,7 @@ def start(service_name, name, address, port, trace_info=None):
parent_id = trace_info[1] parent_id = trace_info[1]
span_stack.spans = [] span_stack.spans = []
span = Span(trace_id, parent_id, getId(), name, []) span = Span(trace_id, parent_id, getId(), name, [], [])
span_stack.spans.append(span) span_stack.spans.append(span)
annotate('start', service_name, address, port) annotate('start', service_name, address, port)
@@ -56,16 +53,36 @@ def stop(name):
for backend in config.get_backends(): for backend in config.get_backends():
backend.send(span) backend.send(span)
def annotate(value, service_name=None, address=None, port=None): def annotate(value, service_name=None, address=None, port=None, duration=None):
last_span = span_stack.spans[-1] """add an annotation at a particular point in time (with an optional duration)"""
cur_span = span_stack.spans[-1]
# attempt to default some values
if service_name is None: if service_name is None:
last_note = last_span.notes[-1] service_name = cur_span.notes[0].service_name
service_name = last_note.service_name if address is None:
address = last_note.address address = cur_span.notes[0].address
port = last_note.port if port is None:
note = Note(time.time(), value, service_name, address, int(port)) port = cur_span.notes[0].port
if duration is None:
duration = 0
note = Note(time.time(), str(value), service_name, address, int(port),
int(duration))
span_stack.spans[-1].notes.append(note) span_stack.spans[-1].notes.append(note)
def tag(key, value, service_name=None, address=None, port=None):
"""add a key/value tag to the current span. values can be int,
float, or string."""
assert isinstance(value, str) or isinstance(value, int) or isinstance(value, float)
cur_span = span_stack.spans[-1]
if service_name is None:
service_name = cur_span.notes[0].service_name
if address is None:
address = cur_span.notes[0].address
if port is None:
port = cur_span.notes[0].port
tag = Tag(str(key), value, service_name, address, port)
span_stack.spans[-1].dimensions.append(tag)
def getId(): def getId():
return random.randrange(sys.maxint >> 10) return random.randrange(sys.maxint >> 10)
@@ -106,7 +123,7 @@ def before_execute(name):
#print >>sys.stderr, 'connection is {0}:{1}'.format(h, port) #print >>sys.stderr, 'connection is {0}:{1}'.format(h, port)
#print >>sys.stderr, 'sql statement is {0}'.format(clauseelement) #print >>sys.stderr, 'sql statement is {0}'.format(clauseelement)
start(str(name) + 'db client', 'execute', h, port) start(str(name) + 'db client', 'execute', h, port)
annotate(str(clauseelement)) annotate(clauseelement)
return handler return handler
def after_execute(name): def after_execute(name):

16
tomograph/types.py Normal file
View File

@@ -0,0 +1,16 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
from collections import namedtuple
Span = namedtuple('Span', 'trace_id parent_id id name notes dimensions')
Note = namedtuple('Note', 'time value service_name address port duration')
Tag = namedtuple('Tag', 'key value service_name address port')