Add option for wsrep_sync_wait

When using Galera, the wsrep_sync_wait option [1]
can change the behavior of a variety of Galera DQL/DML statements
such that a particular operation will pause until outstanding
write-sets are fully persisted to the local node.  The setting
supersedes the previous boolean parameter wsrep_causal_reads
which only affected SELECT, with an updated approach that allows
for fine-grained control of so-called "causality checks"
on individual statement types.  The legacy-compatible setting
of '1' indicates that READ/SELECT/BEGIN operations should
proceed only after any pending writesets are fully available.

The use case for this setting is for an application that
is running operations on multiple Galera nodes simultaenously.
An application that commits data on one node, and then immediately
uses a different connection (on a potentially different node)
to SELECT that data, may fail to see those changes if
"causality checks" for SELECT are not enabled.  While
a COMMIT operation in Galera will block locally until all other
nodes approve of the writeset, the operation does not block
for the subsequent period of time when other nodes are actually
persisting that writeset.  Setting up "causal reads"
in this case indicates that a SELECT operation will wait until
any writesets in progress are available, thus maintaining
serialization between the COMMIT and subsequent SELECT.

As the name implies, wsrep_sync_wait adds...waiting!  to the
operation, and thus directly impacts performance by adding
latency to SELECT operations or to the operations that have
been selected for causality checks, to the degree that
concurrent writesets are expected to be present.

Since it's not expected that most if any Openstack applications
actually need this setting in order to be effective with
Galera multi-master operation, and as the setting is available
within client session scope and also impacts performance,
making it available on a per-application basis means that
specific applications which may see issues under load can
choose to enable this setting, much in the way any other
"transaction isolation" settings might be made, without having
to add a cluster-wide performance penalty by setting it at the
Galera server level.

[1] https://mariadb.com/docs/ent/ref/mdb/system-variables/wsrep_sync_wait/

Change-Id: Iee7afcac8ba952a2d67a9ad9dd0e4eae3f42518e
This commit is contained in:
Mike Bayer 2022-11-28 12:20:46 -05:00
parent a191d2e629
commit 009d23df45
6 changed files with 79 additions and 9 deletions

View File

@ -59,6 +59,14 @@ database_opts = [
'set this to no value. Example: mysql_sql_mode='
),
),
cfg.IntOpt(
'mysql_wsrep_sync_wait',
default=None,
help=(
'For Galera only, configure wsrep_sync_wait causality '
'checks on new connections'
),
),
cfg.BoolOpt(
'mysql_enable_ndb',
default=False,

View File

@ -145,6 +145,7 @@ class _TransactionFactory(object):
self._engine_cfg = {
'sqlite_fk': _Default(False),
'mysql_sql_mode': _Default('TRADITIONAL'),
'mysql_wsrep_sync_wait': _Default(0),
'mysql_enable_ndb': _Default(False),
'connection_recycle_time': _Default(3600),
'connection_debug': _Default(0),
@ -218,6 +219,9 @@ class _TransactionFactory(object):
:param mysql_sql_mode: MySQL SQL mode, defaults to TRADITIONAL
:param mysql_wsrep_sync_wait: MySQL wsrep_sync_wait, defaults to False
(i.e. '0')
:param mysql_enable_ndb: enable MySQL Cluster (NDB) support
:param connection_recycle_time: connection pool recycle time,
@ -1244,6 +1248,8 @@ class LegacyEngineFacade(object):
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
:keyword mysql_wsrep_sync_wait: value of wsrep_sync_wait for Galera
(defaults to '0')
:keyword mysql_enable_ndb: If True, transparently enables support for
handling MySQL Cluster (NDB).
(defaults to False)

View File

@ -162,6 +162,7 @@ def _vet_url(url):
replace=True,
)
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
mysql_wsrep_sync_wait=None,
mysql_enable_ndb=False,
connection_recycle_time=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
@ -204,6 +205,7 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
_init_events(
engine,
mysql_sql_mode=mysql_sql_mode,
mysql_wsrep_sync_wait=mysql_wsrep_sync_wait,
sqlite_synchronous=sqlite_synchronous,
sqlite_fk=sqlite_fk,
thread_checkin=thread_checkin,
@ -301,19 +303,26 @@ def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
@_init_events.dispatch_for("mysql")
def _init_events(engine, mysql_sql_mode=None, **kw):
def _init_events(
engine, mysql_sql_mode=None, mysql_wsrep_sync_wait=None, **kw):
"""Set up event listeners for MySQL."""
if mysql_sql_mode is not None:
if mysql_sql_mode is not None or mysql_wsrep_sync_wait is not None:
@sqlalchemy.event.listens_for(engine, "connect")
def _set_session_sql_mode(dbapi_con, connection_rec):
def _set_session_variables(dbapi_con, connection_rec):
cursor = dbapi_con.cursor()
if mysql_sql_mode is not None:
cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode])
if mysql_wsrep_sync_wait is not None:
cursor.execute(
"SET SESSION wsrep_sync_wait = %s",
[mysql_wsrep_sync_wait]
)
@sqlalchemy.event.listens_for(engine, "first_connect")
def _check_effective_sql_mode(dbapi_con, connection_rec):
if mysql_sql_mode is not None:
_set_session_sql_mode(dbapi_con, connection_rec)
if mysql_sql_mode is not None or mysql_wsrep_sync_wait is not None:
_set_session_variables(dbapi_con, connection_rec)
cursor = dbapi_con.cursor()
cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")

View File

@ -24,6 +24,7 @@ from unittest import mock
import fixtures
from oslo_config import cfg
import sqlalchemy
from sqlalchemy import exc
from sqlalchemy import sql
from sqlalchemy import Column, MetaData, Table
from sqlalchemy.engine import url
@ -415,6 +416,7 @@ class EngineFacadeTestCase(test_base.BaseTestCase):
connection_debug=100,
max_pool_size=10,
mysql_sql_mode='TRADITIONAL',
mysql_wsrep_sync_wait=None,
mysql_enable_ndb=False,
sqlite_fk=False,
connection_recycle_time=mock.ANY,
@ -519,8 +521,15 @@ class SQLiteConnectTest(test_base.BaseTestCase):
class MysqlConnectTest(db_test_base._MySQLOpportunisticTestCase):
def _fixture(self, sql_mode):
return session.create_engine(self.engine.url, mysql_sql_mode=sql_mode)
def _fixture(self, sql_mode=None, mysql_wsrep_sync_wait=None):
kw = {}
if sql_mode is not None:
kw["mysql_sql_mode"] = sql_mode
if mysql_wsrep_sync_wait is not None:
kw["mysql_wsrep_sync_wait"] = mysql_wsrep_sync_wait
return session.create_engine(self.engine.url, **kw)
def _assert_sql_mode(self, engine, sql_mode_present, sql_mode_non_present):
with engine.connect() as conn:
@ -535,6 +544,36 @@ class MysqlConnectTest(db_test_base._MySQLOpportunisticTestCase):
sql_mode_non_present, mode
)
def test_mysql_wsrep_sync_wait_listener(self):
with self.engine.connect() as conn:
try:
conn.execute(
sql.text("show variables like '%wsrep_sync_wait%'")
).scalars(1).one()
except exc.NoResultFound:
self.skipTest("wsrep_sync_wait option is not available")
engine = self._fixture()
with engine.connect() as conn:
self.assertEqual(
"0",
conn.execute(
sql.text("show variables like '%wsrep_sync_wait%'")
).scalars(1).one(),
)
for wsrep_val in (2, 1, 5):
engine = self._fixture(mysql_wsrep_sync_wait=wsrep_val)
with engine.connect() as conn:
self.assertEqual(
str(wsrep_val),
conn.execute(
sql.text("show variables like '%wsrep_sync_wait%'")
).scalars(1).one(),
)
def test_set_mode_traditional(self):
engine = self._fixture(sql_mode='TRADITIONAL')
self._assert_sql_mode(engine, "TRADITIONAL", "ANSI")

View File

@ -0,0 +1,8 @@
---
features:
- |
Added new option mysql_wsrep_sync_wait which sets the Galera
"wsrep_sync_wait" variable on server login. This session-level variable
allows Galera to ensure that writesets are fully up to date before running
new queries, and may be used to tune application behavior when multiple
Galera masters are targeted for SQL operations simultaneously.

View File

@ -29,7 +29,7 @@ commands =
commands =
pre-commit run -a
# Run security linter
bandit -r oslo_db -x tests -n5 --skip B105,B311
bandit -r oslo_db -x tests -x oslo_db/tests -n5 --skip B105,B311
[testenv:venv]
commands = {posargs}