Merge "Flush tokens in batches with DB2"
This commit is contained in:
commit
83db9722c2
|
@ -370,6 +370,39 @@ class SqlToken(SqlTests, test_backend.TokenTests):
|
|||
self.mox.ReplayAll()
|
||||
tok.list_revoked_tokens()
|
||||
|
||||
def test_flush_expired_tokens_batch(self):
|
||||
# This test simply executes the code under test to verify
|
||||
# that the code is legal. It is not possible to test
|
||||
# whether records are deleted in batches using sqlite,
|
||||
# because the limit function does not seem to affect
|
||||
# delete subqueries; these are, however, legal.
|
||||
# After several failed attempts of using mox, it would
|
||||
# seem that the use of mock objects for testing
|
||||
# the target code does not seem possible, because of
|
||||
# the unique way the SQLAlchemy Query class's filter
|
||||
# method works.
|
||||
fixture = self.useFixture(moxstubout.MoxStubout())
|
||||
self.mox = fixture.mox
|
||||
tok = token_sql.Token()
|
||||
self.mox.StubOutWithMock(tok, 'token_flush_batch_size')
|
||||
# Just need a batch larger than 0; note that the code
|
||||
# path with batch_size = 0 is covered by test_backend,
|
||||
# where all backends' flush_expired_tokens methods
|
||||
# are tested.
|
||||
tok.token_flush_batch_size('sqlite').AndReturn(1)
|
||||
self.mox.ReplayAll()
|
||||
tok.flush_expired_tokens()
|
||||
|
||||
def test_token_flush_batch_size_default(self):
|
||||
tok = token_sql.Token()
|
||||
sqlite_batch = tok.token_flush_batch_size('sqlite')
|
||||
self.assertEqual(sqlite_batch, 0)
|
||||
|
||||
def test_token_flush_batch_size_db2(self):
|
||||
tok = token_sql.Token()
|
||||
db2_batch = tok.token_flush_batch_size('ibm_db_sa')
|
||||
self.assertEqual(db2_batch, 100)
|
||||
|
||||
|
||||
class SqlCatalog(SqlTests, test_backend.CatalogTests):
|
||||
def test_malformed_catalog_throws_error(self):
|
||||
|
|
|
@ -187,11 +187,41 @@ class Token(sql.Base, token.Driver):
|
|||
tokens.append(record)
|
||||
return tokens
|
||||
|
||||
def token_flush_batch_size(self, dialect):
|
||||
batch_size = 0
|
||||
if dialect == 'ibm_db_sa':
|
||||
# This functionality is limited to DB2, because
|
||||
# it is necessary to prevent the tranaction log
|
||||
# from filling up, whereas at least some of the
|
||||
# other supported databases do not support update
|
||||
# queries with LIMIT subqueries nor do they appear
|
||||
# to require the use of such queries when deleting
|
||||
# large numbers of records at once.
|
||||
batch_size = 100
|
||||
# Limit of 100 is known to not fill a transaction log
|
||||
# of default maximum size while not significantly
|
||||
# impacting the performance of large token purges on
|
||||
# systems where the maximum transaction log size has
|
||||
# been increased beyond the default.
|
||||
return batch_size
|
||||
|
||||
def flush_expired_tokens(self):
|
||||
session = self.get_session()
|
||||
|
||||
query = session.query(TokenModel)
|
||||
query = query.filter(TokenModel.expires < timeutils.utcnow())
|
||||
query.delete(synchronize_session=False)
|
||||
dialect = session.bind.dialect.name
|
||||
batch_size = self.token_flush_batch_size(dialect)
|
||||
if batch_size > 0:
|
||||
query = session.query(TokenModel.id)
|
||||
query = query.filter(TokenModel.expires < timeutils.utcnow())
|
||||
query = query.limit(batch_size).subquery()
|
||||
delete_query = (session.query(TokenModel).
|
||||
filter(TokenModel.id.in_(query)))
|
||||
while True:
|
||||
rowcount = delete_query.delete(synchronize_session=False)
|
||||
if rowcount == 0:
|
||||
break
|
||||
else:
|
||||
query = session.query(TokenModel)
|
||||
query = query.filter(TokenModel.expires < timeutils.utcnow())
|
||||
query.delete(synchronize_session=False)
|
||||
|
||||
session.flush()
|
||||
|
|
Loading…
Reference in New Issue