import json import logging import os import threading import time import queue import sqlalchemy from sqlalchemy.engine import CreateEnginePlugin from sqlalchemy import event # https://docs.sqlalchemy.org/en/14/core/connections.html? # highlight=createengineplugin#sqlalchemy.engine.CreateEnginePlugin LOG = logging.getLogger(__name__) # The theory of operation here is that we register this plugin with # sqlalchemy via an entry_point. It gets loaded by virtue of plugin= # being in the database connection URL, which gives us an opportunity # to hook the engines that get created. # # We opportunistically spawn a thread, which we feed "hits" to over a # queue, and which occasionally writes those hits to a special # database called 'stats'. We access that database with the same user, # pass, and host as the main connection URL for simplicity. class LogCursorEventsPlugin(CreateEnginePlugin): def __init__(self, url, kwargs): self.db_name = url.database LOG.info('Registered counter for database %s' % self.db_name) new_url = sqlalchemy.engine.URL.create(url.drivername, url.username, url.password, url.host, url.port, 'stats') self.engine = sqlalchemy.create_engine(new_url) self.queue = queue.Queue() self.thread = None def update_url(self, url): return url.difference_update_query(["dbcounter"]) def engine_created(self, engine): """Hook the engine creation process. This is the plug point for the sqlalchemy plugin. Using plugin=$this in the URL causes this method to be called when the engine is created, giving us a chance to hook it below. """ event.listen(engine, "before_cursor_execute", self._log_event) def ensure_writer_thread(self): self.thread = threading.Thread(target=self.stat_writer, daemon=True) self.thread.start() def _log_event(self, conn, cursor, statement, parameters, context, executemany): """Queue a "hit" for this operation to be recorded. Attepts to determine the operation by the first word of the statement, or 'OTHER' if it cannot be determined. """ # Start our thread if not running. If we were forked after the # engine was created and this plugin was associated, our # writer thread is gone, so respawn. if not self.thread or not self.thread.is_alive(): self.ensure_writer_thread() try: op = statement.strip().split(' ', 1)[0] or 'OTHER' except Exception: op = 'OTHER' self.queue.put((self.db_name, op)) def do_incr(self, db, op, count): """Increment the counter for (db,op) by count.""" query = sqlalchemy.text('INSERT INTO queries (db, op, count) ' ' VALUES (:db, :op, :count) ' ' ON DUPLICATE KEY UPDATE count=count+:count') try: with self.engine.begin() as conn: r = conn.execute(query, {'db': db, 'op': op, 'count': count}) except Exception as e: LOG.error('Failed to account for access to database %r: %s', db, e) def stat_writer(self): """Consume messages from the queue and write them in batches. This reads "hists" from from a queue fed by _log_event() and writes (db,op)+=count stats to the database after ten seconds of no activity to avoid triggering a write for every SELECT call. Write no less often than every sixty seconds to avoid being starved by constant activity. """ LOG.debug('[%i] Writer thread running' % os.getpid()) while True: to_write = {} last = time.time() while time.time() - last < 60: try: item = self.queue.get(timeout=10) to_write.setdefault(item, 0) to_write[item] += 1 except queue.Empty: break if to_write: LOG.debug('[%i] Writing DB stats %s' % ( os.getpid(), ','.join(['%s:%s=%i' % (db, op, count) for (db, op), count in to_write.items()]))) for (db, op), count in to_write.items(): self.do_incr(db, op, count)