From 6804135fcc8d1dd58e7eb5ca8ed7e11b4f6b935d Mon Sep 17 00:00:00 2001 From: "Tim Daly, Jr" Date: Tue, 7 May 2013 20:45:18 +0000 Subject: [PATCH] add dimensional tags ("binary_annotations" in zipkin) fix duration for annotations in the zipkin backend (not sure what it's good for, though) move the span/tag/note types into their own file so that backends can import them --- tests/basic.py | 5 +++- tomograph/backends/zipkin/zipkin.py | 26 +++++++++++++++-- tomograph/tomograph.py | 43 ++++++++++++++++++++--------- tomograph/types.py | 16 +++++++++++ 4 files changed, 74 insertions(+), 16 deletions(-) create mode 100644 tomograph/types.py diff --git a/tests/basic.py b/tests/basic.py index 0de4ed2..cd9fdf3 100755 --- a/tests/basic.py +++ b/tests/basic.py @@ -18,8 +18,11 @@ import time @tomograph.traced('test server', 'server response', port=80) def server(latency): + tomograph.annotate('this is an annotation') time.sleep(latency) - + tomograph.tag('this is double', 1.1) + tomograph.tag('this is a string', 'foo') + tomograph.tag('this is an int', 42) @tomograph.traced('test client', 'client request') def client(client_overhead, server_latency): diff --git a/tomograph/backends/zipkin/zipkin.py b/tomograph/backends/zipkin/zipkin.py index e950829..f61ecfb 100644 --- a/tomograph/backends/zipkin/zipkin.py +++ b/tomograph/backends/zipkin/zipkin.py @@ -48,15 +48,37 @@ def send(span): def annotation(note): return zipkin_thrift.Annotation(timestamp = int(note.time * 1e6), value = note.value, + duration = note.duration, host = endpoint(note)) + def binary_annotation(dimension): + if isinstance(dimension.value, str): + tag_type = zipkin_thrift.AnnotationType.STRING + val = dimension.value + elif isinstance(dimension.value, float): + tag_type = zipkin_thrift.AnnotationType.DOUBLE + val = struct.pack('>d', dimension.value) + print "encoding double" + elif isinstance(dimension.value, int): + tag_type = zipkin_thrift.AnnotationType.I64 + val = struct.pack('>q', dimension.value) + else: + raise RuntimeError("unsupported tag type") + return zipkin_thrift.BinaryAnnotation(key = dimension.key, + value = val, + annotation_type = tag_type, + host = endpoint(dimension)) + zspan = zipkin_thrift.Span(trace_id = span.trace_id, id = span.id, name = span.name, parent_id = span.parent_id, - annotations = [annotation(n) for n in span.notes]) + annotations = [annotation(n) for n in span.notes], + binary_annotations = \ + [binary_annotation(d) for d in span.dimensions]) out = StringIO.StringIO() - raw = TBinaryProtocol.TBinaryProtocolAccelerated(out) + #raw = TBinaryProtocol.TBinaryProtocolAccelerated(out) + raw = TBinaryProtocol.TBinaryProtocol(out) try: zspan.write(raw) except OverflowError: diff --git a/tomograph/tomograph.py b/tomograph/tomograph.py index 142db72..a93406e 100644 --- a/tomograph/tomograph.py +++ b/tomograph/tomograph.py @@ -10,12 +10,12 @@ # limitations under the License. See accompanying LICENSE file. import config +from types import Span, Note, Tag import random import sys import time from eventlet import corolocal -from collections import namedtuple import socket import pickle import base64 @@ -24,9 +24,6 @@ import webob.dec span_stack = corolocal.local() -Span = namedtuple('Span', 'trace_id parent_id id name notes') -Note = namedtuple('Note', 'time value service_name address port') - def start(service_name, name, address, port, trace_info=None): parent_id = None if hasattr(span_stack, 'trace_id'): @@ -40,7 +37,7 @@ def start(service_name, name, address, port, trace_info=None): parent_id = trace_info[1] span_stack.spans = [] - span = Span(trace_id, parent_id, getId(), name, []) + span = Span(trace_id, parent_id, getId(), name, [], []) span_stack.spans.append(span) annotate('start', service_name, address, port) @@ -56,15 +53,35 @@ def stop(name): for backend in config.get_backends(): backend.send(span) -def annotate(value, service_name=None, address=None, port=None): - last_span = span_stack.spans[-1] +def annotate(value, service_name=None, address=None, port=None, duration=None): + """add an annotation at a particular point in time (with an optional duration)""" + cur_span = span_stack.spans[-1] + # attempt to default some values if service_name is None: - last_note = last_span.notes[-1] - service_name = last_note.service_name - address = last_note.address - port = last_note.port - note = Note(time.time(), value, service_name, address, int(port)) + service_name = cur_span.notes[0].service_name + if address is None: + address = cur_span.notes[0].address + if port is None: + port = cur_span.notes[0].port + if duration is None: + duration = 0 + note = Note(time.time(), str(value), service_name, address, int(port), + int(duration)) span_stack.spans[-1].notes.append(note) + +def tag(key, value, service_name=None, address=None, port=None): + """add a key/value tag to the current span. values can be int, + float, or string.""" + assert isinstance(value, str) or isinstance(value, int) or isinstance(value, float) + cur_span = span_stack.spans[-1] + if service_name is None: + service_name = cur_span.notes[0].service_name + if address is None: + address = cur_span.notes[0].address + if port is None: + port = cur_span.notes[0].port + tag = Tag(str(key), value, service_name, address, port) + span_stack.spans[-1].dimensions.append(tag) def getId(): return random.randrange(sys.maxint >> 10) @@ -106,7 +123,7 @@ def before_execute(name): #print >>sys.stderr, 'connection is {0}:{1}'.format(h, port) #print >>sys.stderr, 'sql statement is {0}'.format(clauseelement) start(str(name) + 'db client', 'execute', h, port) - annotate(str(clauseelement)) + annotate(clauseelement) return handler def after_execute(name): diff --git a/tomograph/types.py b/tomograph/types.py new file mode 100644 index 0000000..05bed38 --- /dev/null +++ b/tomograph/types.py @@ -0,0 +1,16 @@ +# Copyright (c) 2012 Yahoo! Inc. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 Unless required by +# applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and +# limitations under the License. See accompanying LICENSE file. + +from collections import namedtuple + +Span = namedtuple('Span', 'trace_id parent_id id name notes dimensions') +Note = namedtuple('Note', 'time value service_name address port duration') +Tag = namedtuple('Tag', 'key value service_name address port')