speed up zipkin backend, add trivial benchmark
This commit is contained in:
38
tests/bench.py
Normal file
38
tests/bench.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||||
|
# may not use this file except in compliance with the License. You may
|
||||||
|
# obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
|
||||||
|
# applicable law or agreed to in writing, software distributed under
|
||||||
|
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
|
||||||
|
# OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and
|
||||||
|
# limitations under the License. See accompanying LICENSE file.
|
||||||
|
|
||||||
|
import tomograph
|
||||||
|
import cProfile
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
@tomograph.traced('test server', 'server response', port=80)
|
||||||
|
def server(latency):
|
||||||
|
time.sleep(latency)
|
||||||
|
|
||||||
|
|
||||||
|
@tomograph.traced('test client', 'client request')
|
||||||
|
def client(client_overhead, server_latency):
|
||||||
|
time.sleep(client_overhead)
|
||||||
|
server(server_latency)
|
||||||
|
|
||||||
|
def clientloop():
|
||||||
|
for i in xrange(10000):
|
||||||
|
client(0, 0)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
tomograph.config.set_backends(sys.argv[1:])
|
||||||
|
cProfile.run('clientloop()', 'tomo-bench')
|
||||||
|
|
||||||
|
|
||||||
@@ -9,8 +9,9 @@
|
|||||||
# License for the specific language governing permissions and
|
# License for the specific language governing permissions and
|
||||||
# limitations under the License. See accompanying LICENSE file.
|
# limitations under the License. See accompanying LICENSE file.
|
||||||
|
|
||||||
import zipkin_thrift
|
|
||||||
from generated.scribe import scribe
|
from generated.scribe import scribe
|
||||||
|
import sender
|
||||||
|
import zipkin_thrift
|
||||||
from thrift.transport import TTransport
|
from thrift.transport import TTransport
|
||||||
from thrift.transport import TSocket
|
from thrift.transport import TSocket
|
||||||
from thrift.protocol import TBinaryProtocol
|
from thrift.protocol import TBinaryProtocol
|
||||||
@@ -25,20 +26,37 @@ import random
|
|||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
import atexit
|
||||||
|
import threading
|
||||||
|
|
||||||
|
scribe_sender = sender.ScribeSender()
|
||||||
|
atexit.register(scribe_sender.close)
|
||||||
|
|
||||||
|
class Cache(object):
|
||||||
|
def __init__(self, thunk, size_limit=1000):
|
||||||
|
self._map = {}
|
||||||
|
self._thunk = thunk
|
||||||
|
self._size_limit = size_limit
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def get(self, k):
|
||||||
|
with self._lock:
|
||||||
|
if self._map.has_key(k):
|
||||||
|
return self._map[k]
|
||||||
|
else:
|
||||||
|
while len(self._map) >= self._size_limit:
|
||||||
|
self._map.popitem()
|
||||||
|
v = self._thunk(k)
|
||||||
|
self._map[k] = v
|
||||||
|
return v
|
||||||
|
|
||||||
|
hostname_cache = Cache(socket.gethostbyname)
|
||||||
|
|
||||||
def send(span):
|
def send(span):
|
||||||
tsocket = TSocket.TSocket(config.zipkin_host, config.zipkin_port)
|
|
||||||
transport = TTransport.TFramedTransport(tsocket)
|
|
||||||
transport.open()
|
|
||||||
protocol = TBinaryProtocol.TBinaryProtocol(transport)
|
|
||||||
client = scribe.Client(protocol)
|
|
||||||
|
|
||||||
def endpoint(note):
|
def endpoint(note):
|
||||||
try:
|
try:
|
||||||
ip = socket.gethostbyname(note.address)
|
ip = hostname_cache.get(note.address)
|
||||||
except:
|
except:
|
||||||
print >>sys.stderr, 'host resolution error: ', traceback.format_exc()
|
print >>sys.stderr, 'host resolution error: ', traceback.format_exc()
|
||||||
ip = '0.0.0.0'
|
ip = '0.0.0.0'
|
||||||
@@ -57,12 +75,11 @@ def send(span):
|
|||||||
annotations = [annotation(n) for n in span.notes])
|
annotations = [annotation(n) for n in span.notes])
|
||||||
|
|
||||||
out = StringIO.StringIO()
|
out = StringIO.StringIO()
|
||||||
raw = TBinaryProtocol.TBinaryProtocol(out)
|
raw = TBinaryProtocol.TBinaryProtocolAccelerated(out)
|
||||||
zspan.write(raw)
|
zspan.write(raw)
|
||||||
logentry = scribe.LogEntry('zipkin', base64.b64encode(out.getvalue()))
|
scribe_sender.send('zipkin', base64.b64encode(out.getvalue()))
|
||||||
client.Log([logentry])
|
|
||||||
transport.close()
|
|
||||||
|
|
||||||
def ip_to_i32(ip_str):
|
def ip_to_i32(ip_str):
|
||||||
"""convert an ip address from a string to a signed 32-bit number"""
|
"""convert an ip address from a string to a signed 32-bit number"""
|
||||||
return -0x80000000 + (IPy.IP(ip_str).int() & 0x7fffffff)
|
return -0x80000000 + (IPy.IP(ip_str).int() & 0x7fffffff)
|
||||||
|
|
||||||
|
|||||||
@@ -14,8 +14,10 @@ import logging
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
enabled_backends = ['tomograph.backends.zipkin',
|
enabled_backends = ['tomograph.backends.zipkin',
|
||||||
'tomograph.backends.statsd',
|
#'tomograph.backends.statsd',
|
||||||
'tomograph.backends.log']
|
#'tomograph.backends.log'
|
||||||
|
]
|
||||||
|
#enabled_backends = []
|
||||||
backend_modules = []
|
backend_modules = []
|
||||||
|
|
||||||
def set_backends(backends):
|
def set_backends(backends):
|
||||||
@@ -44,9 +46,17 @@ def get_backends():
|
|||||||
set_backends(enabled_backends)
|
set_backends(enabled_backends)
|
||||||
return backend_modules
|
return backend_modules
|
||||||
|
|
||||||
zipkin_host = '172.16.77.141'
|
zipkin_host = '127.0.0.1'
|
||||||
zipkin_port = 9410
|
zipkin_port = 1463
|
||||||
|
|
||||||
statsd_host = '172.16.77.141'
|
# zipkin_port = 9410
|
||||||
|
|
||||||
|
#statsd_host = '127.0.0.1'
|
||||||
|
statsd_host = 'pairsscares.corp.gq1.yahoo.com'
|
||||||
statsd_port = 8125
|
statsd_port = 8125
|
||||||
|
|
||||||
|
zipkin_socket_timeout = 5.0
|
||||||
|
zipkin_max_queue_length = 50000
|
||||||
|
zipkin_target_write_size = 1000
|
||||||
|
|
||||||
|
debug = False
|
||||||
|
|||||||
Reference in New Issue
Block a user