diff --git a/eventlet/zipkin/README.rst b/eventlet/zipkin/README.rst new file mode 100644 index 0000000..a0dab00 --- /dev/null +++ b/eventlet/zipkin/README.rst @@ -0,0 +1,135 @@ +eventlet.zipkin +=============== + +`Zipkin `_ is a distributed tracing system developed at Twitter. +This package provides a WSGI application using eventlet +with tracing facility that complies with Zipkin. + +Why use it? +From the http://twitter.github.io/zipkin/: + +"Collecting traces helps developers gain deeper knowledge about how +certain requests perform in a distributed system. Let's say we're having +problems with user requests timing out. We can look up traced requests +that timed out and display it in the web UI. We'll be able to quickly +find the service responsible for adding the unexpected response time. If +the service has been annotated adequately we can also find out where in +that service the issue is happening." + + +Screenshot +---------- + +Zipkin web ui screenshots obtained when applying this module to +`OpenStack swift `_ are in example/. + + +Requirement +----------- + +A eventlet.zipkin needs `python scribe client `_ +and `thrift `_ (>=0.9), +because the zipkin collector speaks `scribe `_ protocol. +Below command will install both scribe client and thrift. + +Install facebook-scribe: + +:: + + pip install facebook-scribe + + + + +**Python**: ``2.6``, ``2.7`` (Because the current Python Thrift release doesn't +support Python 3) + + + +How to use +---------- + +Add tracing facility to your application +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Apply the monkey patch before you start wsgi server. + +.. code:: python + + # Add only 2 lines to your code + from eventlet.zipkin import patcher + patcher.enable_trace_patch() + + # existing code + from eventlet import wsgi + wsgi.server(sock, app) + +You can pass some parameters to ``enable_trace_patch()`` + +* host: Scribe daemon IP address (default: '127.0.0.1') +* port: Scribe daemon port (default: 9410) +* trace_app_log: A Boolean indicating if the tracer will trace application log together or not. This facility assume that your application uses python standard logging library. (default: False) +* sampling_rate: A Float value (0.0~1.0) that indicates the tracing frequency. If you specify 1.0, all requests are traced and sent to Zipkin collecotr. If you specify 0.1, only 1/10 requests are traced. (defult: 1.0) + + +(Option) Annotation API +~~~~~~~~~~~~~~~~~~~~~~~ +If you want to record additional information, +you can use below API from anywhere in your code. + +.. code:: python + + from eventlet.zipkin import api + + api.put_annotation('Cache miss for %s' % request) + api.put_key_value('key', 'value') + + + + +Zipkin simple setup +------------------- + +:: + + $ git clone https://github.com/twitter/zipkin.git + $ cd zipkin + # Open 3 terminals + (terminal1) $ bin/collector + (terminal2) $ bin/query + (terminal3) $ bin/web + +Access http://localhost:8080 from your browser. + + +(Option) fluentd +---------------- +If you want to buffer the tracing data for performance, +`fluentd scribe plugin `_ is available. +Since ``out_scribe plugin`` extends `Buffer Plugin `_ , +you can customize buffering parameters in the manner of fluentd. +Scribe plugin is included in td-agent by default. + + +Sample: ``/etc/td-agent/td-agent.conf`` + +:: + + # in_scribe + + type scribe + port 9999 + + + # out_scribe + + type scribe + host Zipkin_collector_IP + port 9410 + flush_interval 60s + buffer_chunk_limit 256m + + +| And, you need to specify ``patcher.enable_trace_patch(port=9999)`` for in_scribe. +| In this case, trace data is passed like below. +| Your application => Local fluentd in_scribe (9999) => Local fluentd out_scribe =====> Remote zipkin collector (9410) + diff --git a/eventlet/zipkin/__init__.py b/eventlet/zipkin/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eventlet/zipkin/_thrift/README.rst b/eventlet/zipkin/_thrift/README.rst new file mode 100644 index 0000000..0317d50 --- /dev/null +++ b/eventlet/zipkin/_thrift/README.rst @@ -0,0 +1,8 @@ +_thrift +======== + +* This directory is auto-generated by Thrift Compiler by using + https://github.com/twitter/zipkin/blob/master/zipkin-thrift/src/main/thrift/com/twitter/zipkin/zipkinCore.thrift + +* Do not modify this directory. + diff --git a/eventlet/zipkin/_thrift/__init__.py b/eventlet/zipkin/_thrift/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eventlet/zipkin/_thrift/zipkinCore.thrift b/eventlet/zipkin/_thrift/zipkinCore.thrift new file mode 100644 index 0000000..0787ca8 --- /dev/null +++ b/eventlet/zipkin/_thrift/zipkinCore.thrift @@ -0,0 +1,55 @@ +# Copyright 2012 Twitter Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +namespace java com.twitter.zipkin.gen +namespace rb Zipkin + +//************** Collection related structs ************** + +// these are the annotations we always expect to find in a span +const string CLIENT_SEND = "cs" +const string CLIENT_RECV = "cr" +const string SERVER_SEND = "ss" +const string SERVER_RECV = "sr" + +// this represents a host and port in a network +struct Endpoint { + 1: i32 ipv4, + 2: i16 port // beware that this will give us negative ports. some conversion needed + 3: string service_name // which service did this operation happen on? +} + +// some event took place, either one by the framework or by the user +struct Annotation { + 1: i64 timestamp // microseconds from epoch + 2: string value // what happened at the timestamp? + 3: optional Endpoint host // host this happened on +} + +enum AnnotationType { BOOL, BYTES, I16, I32, I64, DOUBLE, STRING } + +struct BinaryAnnotation { + 1: string key, + 2: binary value, + 3: AnnotationType annotation_type, + 4: optional Endpoint host +} + +struct Span { + 1: i64 trace_id // unique trace id, use for all spans in trace + 3: string name, // span name, rpc method for example + 4: i64 id, // unique span id, only used for this span + 5: optional i64 parent_id, // parent span id + 6: list annotations, // list of all annotations/events that occured + 8: list binary_annotations // any binary annotations +} diff --git a/eventlet/zipkin/_thrift/zipkinCore/__init__.py b/eventlet/zipkin/_thrift/zipkinCore/__init__.py new file mode 100644 index 0000000..adefd8e --- /dev/null +++ b/eventlet/zipkin/_thrift/zipkinCore/__init__.py @@ -0,0 +1 @@ +__all__ = ['ttypes', 'constants'] diff --git a/eventlet/zipkin/_thrift/zipkinCore/constants.py b/eventlet/zipkin/_thrift/zipkinCore/constants.py new file mode 100644 index 0000000..3e04f77 --- /dev/null +++ b/eventlet/zipkin/_thrift/zipkinCore/constants.py @@ -0,0 +1,14 @@ +# +# Autogenerated by Thrift Compiler (0.8.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# + +from thrift.Thrift import TType, TMessageType, TException +from ttypes import * + +CLIENT_SEND = "cs" +CLIENT_RECV = "cr" +SERVER_SEND = "ss" +SERVER_RECV = "sr" diff --git a/eventlet/zipkin/_thrift/zipkinCore/ttypes.py b/eventlet/zipkin/_thrift/zipkinCore/ttypes.py new file mode 100644 index 0000000..418911f --- /dev/null +++ b/eventlet/zipkin/_thrift/zipkinCore/ttypes.py @@ -0,0 +1,452 @@ +# +# Autogenerated by Thrift Compiler (0.8.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# + +from thrift.Thrift import TType, TMessageType, TException + +from thrift.transport import TTransport +from thrift.protocol import TBinaryProtocol, TProtocol +try: + from thrift.protocol import fastbinary +except: + fastbinary = None + + +class AnnotationType: + BOOL = 0 + BYTES = 1 + I16 = 2 + I32 = 3 + I64 = 4 + DOUBLE = 5 + STRING = 6 + + _VALUES_TO_NAMES = { + 0: "BOOL", + 1: "BYTES", + 2: "I16", + 3: "I32", + 4: "I64", + 5: "DOUBLE", + 6: "STRING", + } + + _NAMES_TO_VALUES = { + "BOOL": 0, + "BYTES": 1, + "I16": 2, + "I32": 3, + "I64": 4, + "DOUBLE": 5, + "STRING": 6, + } + + +class Endpoint: + """ + Attributes: + - ipv4 + - port + - service_name + """ + + thrift_spec = ( + None, # 0 + (1, TType.I32, 'ipv4', None, None, ), # 1 + (2, TType.I16, 'port', None, None, ), # 2 + (3, TType.STRING, 'service_name', None, None, ), # 3 + ) + + def __init__(self, ipv4=None, port=None, service_name=None,): + self.ipv4 = ipv4 + self.port = port + self.service_name = service_name + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I32: + self.ipv4 = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I16: + self.port = iprot.readI16(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.service_name = iprot.readString(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Endpoint') + if self.ipv4 is not None: + oprot.writeFieldBegin('ipv4', TType.I32, 1) + oprot.writeI32(self.ipv4) + oprot.writeFieldEnd() + if self.port is not None: + oprot.writeFieldBegin('port', TType.I16, 2) + oprot.writeI16(self.port) + oprot.writeFieldEnd() + if self.service_name is not None: + oprot.writeFieldBegin('service_name', TType.STRING, 3) + oprot.writeString(self.service_name) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class Annotation: + """ + Attributes: + - timestamp + - value + - host + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'timestamp', None, None, ), # 1 + (2, TType.STRING, 'value', None, None, ), # 2 + (3, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 3 + ) + + def __init__(self, timestamp=None, value=None, host=None,): + self.timestamp = timestamp + self.value = value + self.host = host + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.timestamp = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.value = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRUCT: + self.host = Endpoint() + self.host.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Annotation') + if self.timestamp is not None: + oprot.writeFieldBegin('timestamp', TType.I64, 1) + oprot.writeI64(self.timestamp) + oprot.writeFieldEnd() + if self.value is not None: + oprot.writeFieldBegin('value', TType.STRING, 2) + oprot.writeString(self.value) + oprot.writeFieldEnd() + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRUCT, 3) + self.host.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class BinaryAnnotation: + """ + Attributes: + - key + - value + - annotation_type + - host + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'key', None, None, ), # 1 + (2, TType.STRING, 'value', None, None, ), # 2 + (3, TType.I32, 'annotation_type', None, None, ), # 3 + (4, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 4 + ) + + def __init__(self, key=None, value=None, annotation_type=None, host=None,): + self.key = key + self.value = value + self.annotation_type = annotation_type + self.host = host + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.key = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.value = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.annotation_type = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRUCT: + self.host = Endpoint() + self.host.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('BinaryAnnotation') + if self.key is not None: + oprot.writeFieldBegin('key', TType.STRING, 1) + oprot.writeString(self.key) + oprot.writeFieldEnd() + if self.value is not None: + oprot.writeFieldBegin('value', TType.STRING, 2) + oprot.writeString(self.value) + oprot.writeFieldEnd() + if self.annotation_type is not None: + oprot.writeFieldBegin('annotation_type', TType.I32, 3) + oprot.writeI32(self.annotation_type) + oprot.writeFieldEnd() + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRUCT, 4) + self.host.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class Span: + """ + Attributes: + - trace_id + - name + - id + - parent_id + - annotations + - binary_annotations + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'trace_id', None, None, ), # 1 + None, # 2 + (3, TType.STRING, 'name', None, None, ), # 3 + (4, TType.I64, 'id', None, None, ), # 4 + (5, TType.I64, 'parent_id', None, None, ), # 5 + (6, TType.LIST, 'annotations', (TType.STRUCT,(Annotation, Annotation.thrift_spec)), None, ), # 6 + None, # 7 + (8, TType.LIST, 'binary_annotations', (TType.STRUCT,(BinaryAnnotation, BinaryAnnotation.thrift_spec)), None, ), # 8 + ) + + def __init__(self, trace_id=None, name=None, id=None, parent_id=None, annotations=None, binary_annotations=None,): + self.trace_id = trace_id + self.name = name + self.id = id + self.parent_id = parent_id + self.annotations = annotations + self.binary_annotations = binary_annotations + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.trace_id = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.name = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.id = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I64: + self.parent_id = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.LIST: + self.annotations = [] + (_etype3, _size0) = iprot.readListBegin() + for _i4 in xrange(_size0): + _elem5 = Annotation() + _elem5.read(iprot) + self.annotations.append(_elem5) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 8: + if ftype == TType.LIST: + self.binary_annotations = [] + (_etype9, _size6) = iprot.readListBegin() + for _i10 in xrange(_size6): + _elem11 = BinaryAnnotation() + _elem11.read(iprot) + self.binary_annotations.append(_elem11) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Span') + if self.trace_id is not None: + oprot.writeFieldBegin('trace_id', TType.I64, 1) + oprot.writeI64(self.trace_id) + oprot.writeFieldEnd() + if self.name is not None: + oprot.writeFieldBegin('name', TType.STRING, 3) + oprot.writeString(self.name) + oprot.writeFieldEnd() + if self.id is not None: + oprot.writeFieldBegin('id', TType.I64, 4) + oprot.writeI64(self.id) + oprot.writeFieldEnd() + if self.parent_id is not None: + oprot.writeFieldBegin('parent_id', TType.I64, 5) + oprot.writeI64(self.parent_id) + oprot.writeFieldEnd() + if self.annotations is not None: + oprot.writeFieldBegin('annotations', TType.LIST, 6) + oprot.writeListBegin(TType.STRUCT, len(self.annotations)) + for iter12 in self.annotations: + iter12.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.binary_annotations is not None: + oprot.writeFieldBegin('binary_annotations', TType.LIST, 8) + oprot.writeListBegin(TType.STRUCT, len(self.binary_annotations)) + for iter13 in self.binary_annotations: + iter13.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/eventlet/zipkin/api.py b/eventlet/zipkin/api.py new file mode 100644 index 0000000..cd03ec0 --- /dev/null +++ b/eventlet/zipkin/api.py @@ -0,0 +1,186 @@ +import os +import sys +import time +import struct +import socket +import random + +from eventlet.green import threading +from eventlet.zipkin._thrift.zipkinCore import ttypes +from eventlet.zipkin._thrift.zipkinCore.constants import SERVER_SEND + + +client = None +_tls = threading.local() # thread local storage + + +def put_annotation(msg, endpoint=None): + """ This is annotation API. + You can add your own annotation from in your code. + Annotation is recorded with timestamp automatically. + e.g.) put_annotation('cache hit for %s' % request) + + :param msg: String message + :param endpoint: host info + """ + if is_sample(): + a = ZipkinDataBuilder.build_annotation(msg, endpoint) + trace_data = get_trace_data() + trace_data.add_annotation(a) + + +def put_key_value(key, value, endpoint=None): + """ This is binary annotation API. + You can add your own key-value extra information from in your code. + Key-value doesn't have a time component. + e.g.) put_key_value('http.uri', '/hoge/index.html') + + :param key: String + :param value: String + :param endpoint: host info + """ + if is_sample(): + b = ZipkinDataBuilder.build_binary_annotation(key, value, endpoint) + trace_data = get_trace_data() + trace_data.add_binary_annotation(b) + + +def is_tracing(): + """ Return whether the current thread is tracking or not """ + return hasattr(_tls, 'trace_data') + + +def is_sample(): + """ Return whether it should record trace information + for the request or not + """ + return is_tracing() and _tls.trace_data.sampled + + +def get_trace_data(): + if is_tracing(): + return _tls.trace_data + + +def set_trace_data(trace_data): + _tls.trace_data = trace_data + + +def init_trace_data(): + if is_tracing(): + del _tls.trace_data + + +def _uniq_id(): + """ + Create a random 64-bit signed integer appropriate + for use as trace and span IDs. + XXX: By experimentation zipkin has trouble recording traces with ids + larger than (2 ** 56) - 1 + """ + return random.randint(0, (2 ** 56) - 1) + + +def generate_trace_id(): + return _uniq_id() + + +def generate_span_id(): + return _uniq_id() + + +class TraceData(object): + + END_ANNOTATION = SERVER_SEND + + def __init__(self, name, trace_id, span_id, parent_id, sampled, endpoint): + """ + :param name: RPC name (String) + :param trace_id: int + :param span_id: int + :param parent_id: int or None + :param sampled: lets the downstream servers know + if I should record trace data for the request (bool) + :param endpoint: zipkin._thrift.zipkinCore.ttypes.EndPoint + """ + self.name = name + self.trace_id = trace_id + self.span_id = span_id + self.parent_id = parent_id + self.sampled = sampled + self.endpoint = endpoint + self.annotations = [] + self.bannotations = [] + self._done = False + + def add_annotation(self, annotation): + if annotation.host is None: + annotation.host = self.endpoint + if not self._done: + self.annotations.append(annotation) + if annotation.value == self.END_ANNOTATION: + self.flush() + + def add_binary_annotation(self, bannotation): + if bannotation.host is None: + bannotation.host = self.endpoint + if not self._done: + self.bannotations.append(bannotation) + + def flush(self): + span = ZipkinDataBuilder.build_span(name=self.name, + trace_id=self.trace_id, + span_id=self.span_id, + parent_id=self.parent_id, + annotations=self.annotations, + bannotations=self.bannotations) + client.send_to_collector(span) + self.annotations = [] + self.bannotations = [] + self._done = True + + +class ZipkinDataBuilder: + @staticmethod + def build_span(name, trace_id, span_id, parent_id, + annotations, bannotations): + return ttypes.Span( + name=name, + trace_id=trace_id, + id=span_id, + parent_id=parent_id, + annotations=annotations, + binary_annotations=bannotations + ) + + @staticmethod + def build_annotation(value, endpoint=None): + if isinstance(value, unicode): + value = value.encode('utf-8') + return ttypes.Annotation(time.time() * 1000 * 1000, + str(value), endpoint) + + @staticmethod + def build_binary_annotation(key, value, endpoint=None): + annotation_type = ttypes.AnnotationType.STRING + return ttypes.BinaryAnnotation(key, value, annotation_type, endpoint) + + @staticmethod + def build_endpoint(ipv4=None, port=None, service_name=None): + if ipv4 is not None: + ipv4 = ZipkinDataBuilder._ipv4_to_int(ipv4) + if service_name is None: + service_name = ZipkinDataBuilder._get_script_name() + return ttypes.Endpoint( + ipv4=ipv4, + port=port, + service_name=service_name + ) + + @staticmethod + def _ipv4_to_int(ipv4): + return struct.unpack('!i', socket.inet_aton(ipv4))[0] + + @staticmethod + def _get_script_name(): + return os.path.basename(sys.argv[0]) diff --git a/eventlet/zipkin/client.py b/eventlet/zipkin/client.py new file mode 100644 index 0000000..c94070a --- /dev/null +++ b/eventlet/zipkin/client.py @@ -0,0 +1,56 @@ +import base64 +import warnings + +from scribe import scribe +from thrift.transport import TTransport, TSocket +from thrift.protocol import TBinaryProtocol + +from eventlet import GreenPile + + +CATEGORY = 'zipkin' + + +class ZipkinClient(object): + + def __init__(self, host='127.0.0.1', port=9410): + """ + :param host: zipkin collector IP addoress (default '127.0.0.1') + :param port: zipkin collector port (default 9410) + """ + self.host = host + self.port = port + self.pile = GreenPile(1) + self._connect() + + def _connect(self): + socket = TSocket.TSocket(self.host, self.port) + self.transport = TTransport.TFramedTransport(socket) + protocol = TBinaryProtocol.TBinaryProtocol(self.transport, + False, False) + self.scribe_client = scribe.Client(protocol) + try: + self.transport.open() + except TTransport.TTransportException as e: + warnings.warn(e.message) + + def _build_message(self, thrift_obj): + trans = TTransport.TMemoryBuffer() + protocol = TBinaryProtocol.TBinaryProtocolAccelerated(trans=trans) + thrift_obj.write(protocol) + return base64.b64encode(trans.getvalue()) + + def send_to_collector(self, span): + self.pile.spawn(self._send, span) + + def _send(self, span): + log_entry = scribe.LogEntry(CATEGORY, self._build_message(span)) + try: + self.scribe_client.Log([log_entry]) + except Exception as e: + msg = 'ZipkinClient send error %s' % str(e) + warnings.warn(msg) + self._connect() + + def close(self): + self.transport.close() diff --git a/eventlet/zipkin/example/ex1.png b/eventlet/zipkin/example/ex1.png new file mode 100755 index 0000000..7f7a049 Binary files /dev/null and b/eventlet/zipkin/example/ex1.png differ diff --git a/eventlet/zipkin/example/ex2.png b/eventlet/zipkin/example/ex2.png new file mode 100755 index 0000000..19dbc3a Binary files /dev/null and b/eventlet/zipkin/example/ex2.png differ diff --git a/eventlet/zipkin/example/ex3.png b/eventlet/zipkin/example/ex3.png new file mode 100755 index 0000000..5ff9860 Binary files /dev/null and b/eventlet/zipkin/example/ex3.png differ diff --git a/eventlet/zipkin/greenthread.py b/eventlet/zipkin/greenthread.py new file mode 100644 index 0000000..37e12d6 --- /dev/null +++ b/eventlet/zipkin/greenthread.py @@ -0,0 +1,33 @@ +from eventlet import greenthread + +from eventlet.zipkin import api + + +__original_init__ = greenthread.GreenThread.__init__ +__original_main__ = greenthread.GreenThread.main + + +def _patched__init(self, parent): + # parent thread saves current TraceData from tls to self + if api.is_tracing(): + self.trace_data = api.get_trace_data() + + __original_init__(self, parent) + + +def _patched_main(self, function, args, kwargs): + # child thread inherits TraceData + if hasattr(self, 'trace_data'): + api.set_trace_data(self.trace_data) + + __original_main__(self, function, args, kwargs) + + +def patch(): + greenthread.GreenThread.__init__ = _patched__init + greenthread.GreenThread.main = _patched_main + + +def unpatch(): + greenthread.GreenThread.__init__ = __original_init__ + greenthread.GreenThread.main = __original_main__ diff --git a/eventlet/zipkin/http.py b/eventlet/zipkin/http.py new file mode 100644 index 0000000..668c3f9 --- /dev/null +++ b/eventlet/zipkin/http.py @@ -0,0 +1,61 @@ +import warnings + +from eventlet.support import six +from eventlet.green import httplib +from eventlet.zipkin import api + + +# see https://twitter.github.io/zipkin/Instrumenting.html +HDR_TRACE_ID = 'X-B3-TraceId' +HDR_SPAN_ID = 'X-B3-SpanId' +HDR_PARENT_SPAN_ID = 'X-B3-ParentSpanId' +HDR_SAMPLED = 'X-B3-Sampled' + + +if six.PY2: + __org_endheaders__ = httplib.HTTPConnection.endheaders + __org_begin__ = httplib.HTTPResponse.begin + + def _patched_endheaders(self): + if api.is_tracing(): + trace_data = api.get_trace_data() + new_span_id = api.generate_span_id() + self.putheader(HDR_TRACE_ID, hex_str(trace_data.trace_id)) + self.putheader(HDR_SPAN_ID, hex_str(new_span_id)) + self.putheader(HDR_PARENT_SPAN_ID, hex_str(trace_data.span_id)) + self.putheader(HDR_SAMPLED, int(trace_data.sampled)) + api.put_annotation('Client Send') + + __org_endheaders__(self) + + def _patched_begin(self): + __org_begin__(self) + + if api.is_tracing(): + api.put_annotation('Client Recv (%s)' % self.status) + + +def patch(): + if six.PY2: + httplib.HTTPConnection.endheaders = _patched_endheaders + httplib.HTTPResponse.begin = _patched_begin + if six.PY3: + warnings.warn("Since current Python thrift release \ + doesn't support Python 3, eventlet.zipkin.http \ + doesn't also support Python 3 (http.client)") + + +def unpatch(): + if six.PY2: + httplib.HTTPConnection.endheaders = __org_endheaders__ + httplib.HTTPResponse.begin = __org_begin__ + if six.PY3: + pass + + +def hex_str(n): + """ + Thrift uses a binary representation of trace and span ids + HTTP headers use a hexadecimal representation of the same + """ + return '%0.16x' % (n,) diff --git a/eventlet/zipkin/log.py b/eventlet/zipkin/log.py new file mode 100644 index 0000000..b7f9d32 --- /dev/null +++ b/eventlet/zipkin/log.py @@ -0,0 +1,19 @@ +import logging + +from eventlet.zipkin import api + + +__original_handle__ = logging.Logger.handle + + +def _patched_handle(self, record): + __original_handle__(self, record) + api.put_annotation(record.getMessage()) + + +def patch(): + logging.Logger.handle = _patched_handle + + +def unpatch(): + logging.Logger.handle = __original_handle__ diff --git a/eventlet/zipkin/patcher.py b/eventlet/zipkin/patcher.py new file mode 100644 index 0000000..8e7d8ad --- /dev/null +++ b/eventlet/zipkin/patcher.py @@ -0,0 +1,41 @@ +from eventlet.zipkin import http +from eventlet.zipkin import wsgi +from eventlet.zipkin import greenthread +from eventlet.zipkin import log +from eventlet.zipkin import api +from eventlet.zipkin.client import ZipkinClient + + +def enable_trace_patch(host='127.0.0.1', port=9410, + trace_app_log=False, sampling_rate=1.0): + """ Apply monkey patch to trace your WSGI application. + + :param host: Scribe daemon IP address (default: '127.0.0.1') + :param port: Scribe daemon port (default: 9410) + :param trace_app_log: A Boolean indicating if the tracer will trace + application log together or not. This facility assume that + your application uses python standard logging library. + (default: False) + :param sampling_rate: A Float value (0.0~1.0) that indicates + the tracing frequency. If you specify 1.0, all request + are traced (and sent to Zipkin collecotr). + If you specify 0.1, only 1/10 requests are traced. (default: 1.0) + """ + api.client = ZipkinClient(host, port) + + # monkey patch for adding tracing facility + wsgi.patch(sampling_rate) + http.patch() + greenthread.patch() + + # monkey patch for capturing application log + if trace_app_log: + log.patch() + + +def disable_trace_patch(): + http.unpatch() + wsgi.unpatch() + greenthread.unpatch() + log.unpatch() + api.client.close() diff --git a/eventlet/zipkin/wsgi.py b/eventlet/zipkin/wsgi.py new file mode 100644 index 0000000..3d52911 --- /dev/null +++ b/eventlet/zipkin/wsgi.py @@ -0,0 +1,78 @@ +import random + +from eventlet import wsgi +from eventlet.zipkin import api +from eventlet.zipkin._thrift.zipkinCore.constants import \ + SERVER_RECV, SERVER_SEND +from eventlet.zipkin.http import \ + HDR_TRACE_ID, HDR_SPAN_ID, HDR_PARENT_SPAN_ID, HDR_SAMPLED + + +_sampler = None +__original_handle_one_response__ = wsgi.HttpProtocol.handle_one_response + + +def _patched_handle_one_response(self): + api.init_trace_data() + trace_id = int_or_none(self.headers.getheader(HDR_TRACE_ID)) + span_id = int_or_none(self.headers.getheader(HDR_SPAN_ID)) + parent_id = int_or_none(self.headers.getheader(HDR_PARENT_SPAN_ID)) + sampled = bool_or_none(self.headers.getheader(HDR_SAMPLED)) + if trace_id is None: # front-end server + trace_id = span_id = api.generate_trace_id() + parent_id = None + sampled = _sampler.sampling() + ip, port = self.request.getsockname()[:2] + ep = api.ZipkinDataBuilder.build_endpoint(ip, port) + trace_data = api.TraceData(name=self.command, + trace_id=trace_id, + span_id=span_id, + parent_id=parent_id, + sampled=sampled, + endpoint=ep) + api.set_trace_data(trace_data) + api.put_annotation(SERVER_RECV) + api.put_key_value('http.uri', self.path) + + __original_handle_one_response__(self) + + if api.is_sample(): + api.put_annotation(SERVER_SEND) + + +class Sampler(object): + def __init__(self, sampling_rate): + self.sampling_rate = sampling_rate + + def sampling(self): + # avoid generating unneeded random numbers + if self.sampling_rate == 1.0: + return True + r = random.random() + if r < self.sampling_rate: + return True + return False + + +def int_or_none(val): + if val is None: + return None + return int(val, 16) + + +def bool_or_none(val): + if val == '1': + return True + if val == '0': + return False + return None + + +def patch(sampling_rate): + global _sampler + _sampler = Sampler(sampling_rate) + wsgi.HttpProtocol.handle_one_response = _patched_handle_one_response + + +def unpatch(): + wsgi.HttpProtocol.handle_one_response = __original_handle_one_response__