make the statsdb backend eventlet safe. without the lock, you get
multiple concurrent write errors.
This commit is contained in:
@@ -8,12 +8,14 @@
|
|||||||
# OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and
|
# License for the specific language governing permissions and
|
||||||
# limitations under the License. See accompanying LICENSE file.
|
# limitations under the License. See accompanying LICENSE file.
|
||||||
|
import eventlet
|
||||||
|
|
||||||
from tomograph import config
|
from tomograph import config
|
||||||
from tomograph import cache
|
from tomograph import cache
|
||||||
|
|
||||||
import logging
|
logging = eventlet.import_patched('logging')
|
||||||
import socket
|
socket = eventlet.import_patched('socket')
|
||||||
|
threading = eventlet.import_patched('threading')
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -21,12 +23,15 @@ udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|||||||
|
|
||||||
hostname_cache = cache.Cache(socket.gethostbyname)
|
hostname_cache = cache.Cache(socket.gethostbyname)
|
||||||
|
|
||||||
|
lock = threading.Lock()
|
||||||
|
|
||||||
def send(span):
|
def send(span):
|
||||||
|
|
||||||
def statsd_send(name, value, units):
|
def statsd_send(name, value, units):
|
||||||
stat = str(name).replace(' ', '-') + ':' + str(int(value)) + '|' + str(units)
|
stat = str(name).replace(' ', '-') + ':' + str(int(value)) + '|' + str(units)
|
||||||
#logger.info('sending stat {0}'.format(stat))
|
#logger.info('sending stat {0}'.format(stat))
|
||||||
udp_socket.sendto(stat, (hostname_cache.get(config.statsd_host), config.statsd_port))
|
with lock:
|
||||||
|
udp_socket.sendto(stat, (hostname_cache.get(config.statsd_host), config.statsd_port))
|
||||||
|
|
||||||
def server_name(note):
|
def server_name(note):
|
||||||
address = note.address.replace('.', '-')
|
address = note.address.replace('.', '-')
|
||||||
|
|||||||
@@ -103,13 +103,13 @@ def before_execute(name):
|
|||||||
port = conn.connection.connection.port
|
port = conn.connection.connection.port
|
||||||
#print >>sys.stderr, 'connection is {0}:{1}'.format(h, port)
|
#print >>sys.stderr, 'connection is {0}:{1}'.format(h, port)
|
||||||
#print >>sys.stderr, 'sql statement is {0}'.format(clauseelement)
|
#print >>sys.stderr, 'sql statement is {0}'.format(clauseelement)
|
||||||
#start(str(name) + 'db client', 'execute', h, port)
|
start(str(name) + 'db client', 'execute', h, port)
|
||||||
return handler
|
return handler
|
||||||
|
|
||||||
def after_execute(name):
|
def after_execute(name):
|
||||||
# name isn't used, at least not yet...
|
# name isn't used, at least not yet...
|
||||||
def handler(conn, clauseelement, multiparams, params, result):
|
def handler(conn, clauseelement, multiparams, params, result):
|
||||||
#stop('execute')
|
stop('execute')
|
||||||
pass
|
pass
|
||||||
return handler
|
return handler
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user