Change DB counting mechanism
The mysql performance_schema method for counting per-database queries is very heavyweight in that it requires full logging (in a table) of every query. We do hundreds of thousands in the course of a tempest run, which ends up creating its own performance problem. This changes the approach we take, which is to bundle a very tiny sqlalchemy plugin module which counts just what we care about in a special database. It is more complex than just enabling the features in mysql, but it is a massively smaller runtime overhead. It also provides us the opportunity to easily zero the counters just before a tempest run. Change-Id: I361bc30bb970cdaf18b966951f217862d302f0b9
This commit is contained in:
parent
d450e146cc
commit
fe52d7f0a8
@ -151,12 +151,16 @@ function configure_database_mysql {
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
if [[ "$MYSQL_GATHER_PERFORMANCE" == "True" ]]; then
|
if [[ "$MYSQL_GATHER_PERFORMANCE" == "True" ]]; then
|
||||||
echo "enabling MySQL performance_schema items"
|
echo "enabling MySQL performance counting"
|
||||||
# Enable long query history
|
|
||||||
iniset -sudo $my_conf mysqld \
|
# Install our sqlalchemy plugin
|
||||||
performance-schema-consumer-events-statements-history-long TRUE
|
pip_install ${TOP_DIR}/tools/dbcounter
|
||||||
iniset -sudo $my_conf mysqld \
|
|
||||||
performance_schema_events_stages_history_long_size 1000000
|
# Create our stats database for accounting
|
||||||
|
recreate_database stats
|
||||||
|
mysql -u $DATABASE_USER -p$DATABASE_PASSWORD -h $MYSQL_HOST -e \
|
||||||
|
"CREATE TABLE queries (db VARCHAR(32), op VARCHAR(32),
|
||||||
|
count INT, PRIMARY KEY (db, op)) ENGINE MEMORY" stats
|
||||||
fi
|
fi
|
||||||
|
|
||||||
restart_service $MYSQL_SERVICE_NAME
|
restart_service $MYSQL_SERVICE_NAME
|
||||||
@ -218,7 +222,17 @@ function install_database_python_mysql {
|
|||||||
|
|
||||||
function database_connection_url_mysql {
|
function database_connection_url_mysql {
|
||||||
local db=$1
|
local db=$1
|
||||||
echo "$BASE_SQL_CONN/$db?charset=utf8"
|
local plugin
|
||||||
|
|
||||||
|
# NOTE(danms): We don't enable perf on subnodes yet because the
|
||||||
|
# plugin is not installed there
|
||||||
|
if [[ "$MYSQL_GATHER_PERFORMANCE" == "True" ]]; then
|
||||||
|
if is_service_enabled mysql; then
|
||||||
|
plugin="&plugin=dbcounter"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "$BASE_SQL_CONN/$db?charset=utf8$plugin"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
13
stack.sh
13
stack.sh
@ -1512,6 +1512,19 @@ async_cleanup
|
|||||||
time_totals
|
time_totals
|
||||||
async_print_timing
|
async_print_timing
|
||||||
|
|
||||||
|
if is_service_enabled mysql; then
|
||||||
|
if [[ "$MYSQL_GATHER_PERFORMANCE" == "True" && "$MYSQL_HOST" ]]; then
|
||||||
|
echo ""
|
||||||
|
echo ""
|
||||||
|
echo "Post-stack database query stats:"
|
||||||
|
mysql -u $DATABASE_USER -p$DATABASE_PASSWORD -h $MYSQL_HOST stats -e \
|
||||||
|
'SELECT * FROM queries' -t 2>/dev/null
|
||||||
|
mysql -u $DATABASE_USER -p$DATABASE_PASSWORD -h $MYSQL_HOST stats -e \
|
||||||
|
'DELETE FROM queries' 2>/dev/null
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
|
||||||
# Using the cloud
|
# Using the cloud
|
||||||
# ===============
|
# ===============
|
||||||
|
|
||||||
|
120
tools/dbcounter/dbcounter.py
Normal file
120
tools/dbcounter/dbcounter.py
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import queue
|
||||||
|
|
||||||
|
import sqlalchemy
|
||||||
|
from sqlalchemy.engine import CreateEnginePlugin
|
||||||
|
from sqlalchemy import event
|
||||||
|
|
||||||
|
# https://docs.sqlalchemy.org/en/14/core/connections.html?
|
||||||
|
# highlight=createengineplugin#sqlalchemy.engine.CreateEnginePlugin
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# The theory of operation here is that we register this plugin with
|
||||||
|
# sqlalchemy via an entry_point. It gets loaded by virtue of plugin=
|
||||||
|
# being in the database connection URL, which gives us an opportunity
|
||||||
|
# to hook the engines that get created.
|
||||||
|
#
|
||||||
|
# We opportunistically spawn a thread, which we feed "hits" to over a
|
||||||
|
# queue, and which occasionally writes those hits to a special
|
||||||
|
# database called 'stats'. We access that database with the same user,
|
||||||
|
# pass, and host as the main connection URL for simplicity.
|
||||||
|
|
||||||
|
|
||||||
|
class LogCursorEventsPlugin(CreateEnginePlugin):
|
||||||
|
def __init__(self, url, kwargs):
|
||||||
|
self.db_name = url.database
|
||||||
|
LOG.info('Registered counter for database %s' % self.db_name)
|
||||||
|
new_url = sqlalchemy.engine.URL.create(url.drivername,
|
||||||
|
url.username,
|
||||||
|
url.password,
|
||||||
|
url.host,
|
||||||
|
url.port,
|
||||||
|
'stats')
|
||||||
|
|
||||||
|
self.engine = sqlalchemy.create_engine(new_url)
|
||||||
|
self.queue = queue.Queue()
|
||||||
|
self.thread = None
|
||||||
|
|
||||||
|
def engine_created(self, engine):
|
||||||
|
"""Hook the engine creation process.
|
||||||
|
|
||||||
|
This is the plug point for the sqlalchemy plugin. Using
|
||||||
|
plugin=$this in the URL causes this method to be called when
|
||||||
|
the engine is created, giving us a chance to hook it below.
|
||||||
|
"""
|
||||||
|
event.listen(engine, "before_cursor_execute", self._log_event)
|
||||||
|
|
||||||
|
def ensure_writer_thread(self):
|
||||||
|
self.thread = threading.Thread(target=self.stat_writer, daemon=True)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def _log_event(self, conn, cursor, statement, parameters, context,
|
||||||
|
executemany):
|
||||||
|
"""Queue a "hit" for this operation to be recorded.
|
||||||
|
|
||||||
|
Attepts to determine the operation by the first word of the
|
||||||
|
statement, or 'OTHER' if it cannot be determined.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Start our thread if not running. If we were forked after the
|
||||||
|
# engine was created and this plugin was associated, our
|
||||||
|
# writer thread is gone, so respawn.
|
||||||
|
if not self.thread or not self.thread.is_alive():
|
||||||
|
self.ensure_writer_thread()
|
||||||
|
|
||||||
|
try:
|
||||||
|
op = statement.strip().split(' ', 1)[0] or 'OTHER'
|
||||||
|
except Exception:
|
||||||
|
op = 'OTHER'
|
||||||
|
|
||||||
|
self.queue.put((self.db_name, op))
|
||||||
|
|
||||||
|
def do_incr(self, db, op, count):
|
||||||
|
"""Increment the counter for (db,op) by count."""
|
||||||
|
|
||||||
|
query = ('INSERT INTO queries (db, op, count) '
|
||||||
|
' VALUES (%s, %s, %s) '
|
||||||
|
' ON DUPLICATE KEY UPDATE count=count+%s')
|
||||||
|
try:
|
||||||
|
with self.engine.begin() as conn:
|
||||||
|
r = conn.execute(query, (db, op, count, count))
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error('Failed to account for access to database %r: %s',
|
||||||
|
db, e)
|
||||||
|
|
||||||
|
def stat_writer(self):
|
||||||
|
"""Consume messages from the queue and write them in batches.
|
||||||
|
|
||||||
|
This reads "hists" from from a queue fed by _log_event() and
|
||||||
|
writes (db,op)+=count stats to the database after ten seconds
|
||||||
|
of no activity to avoid triggering a write for every SELECT
|
||||||
|
call. Write no less often than every thirty seconds and/or 100
|
||||||
|
pending hits to avoid being starved by constant activity.
|
||||||
|
"""
|
||||||
|
LOG.debug('[%i] Writer thread running' % os.getpid())
|
||||||
|
while True:
|
||||||
|
to_write = {}
|
||||||
|
total = 0
|
||||||
|
last = time.time()
|
||||||
|
while time.time() - last < 30 and total < 100:
|
||||||
|
try:
|
||||||
|
item = self.queue.get(timeout=10)
|
||||||
|
to_write.setdefault(item, 0)
|
||||||
|
to_write[item] += 1
|
||||||
|
total += 1
|
||||||
|
except queue.Empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
if to_write:
|
||||||
|
LOG.debug('[%i] Writing DB stats %s' % (
|
||||||
|
os.getpid(),
|
||||||
|
','.join(['%s:%s=%i' % (db, op, count)
|
||||||
|
for (db, op), count in to_write.items()])))
|
||||||
|
|
||||||
|
for (db, op), count in to_write.items():
|
||||||
|
self.do_incr(db, op, count)
|
3
tools/dbcounter/pyproject.toml
Normal file
3
tools/dbcounter/pyproject.toml
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["sqlalchemy", "setuptools>=42"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
14
tools/dbcounter/setup.cfg
Normal file
14
tools/dbcounter/setup.cfg
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
[metadata]
|
||||||
|
name = dbcounter
|
||||||
|
author = Dan Smith
|
||||||
|
author_email = dms@danplanet.com
|
||||||
|
version = 0.1
|
||||||
|
description = A teeny tiny dbcounter plugin for use with devstack
|
||||||
|
url = http://github.com/openstack/devstack
|
||||||
|
license = Apache
|
||||||
|
|
||||||
|
[options]
|
||||||
|
modules = dbcounter
|
||||||
|
entry_points =
|
||||||
|
[sqlalchemy.plugins]
|
||||||
|
dbcounter = dbcounter:LogCursorEventsPlugin
|
@ -83,13 +83,11 @@ def get_processes_stats(matches):
|
|||||||
def get_db_stats(host, user, passwd):
|
def get_db_stats(host, user, passwd):
|
||||||
dbs = []
|
dbs = []
|
||||||
db = pymysql.connect(host=host, user=user, password=passwd,
|
db = pymysql.connect(host=host, user=user, password=passwd,
|
||||||
database='performance_schema',
|
database='stats',
|
||||||
cursorclass=pymysql.cursors.DictCursor)
|
cursorclass=pymysql.cursors.DictCursor)
|
||||||
with db:
|
with db:
|
||||||
with db.cursor() as cur:
|
with db.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute('SELECT db,op,count FROM queries')
|
||||||
'SELECT COUNT(*) AS queries,current_schema AS db FROM '
|
|
||||||
'events_statements_history_long GROUP BY current_schema')
|
|
||||||
for row in cur:
|
for row in cur:
|
||||||
dbs.append({k: tryint(v) for k, v in row.items()})
|
dbs.append({k: tryint(v) for k, v in row.items()})
|
||||||
return dbs
|
return dbs
|
||||||
|
Loading…
Reference in New Issue
Block a user