Change journal entry selection to optimistic locking
Journal entry selection with SELECT FOR UPDATE can cause deadlocks if calculating dependencies, it was changed to optimistic locking to avoid this. Change-Id: I5692b1f2e4bfe5eac3ead4b6ac32c64d5ea1f180
This commit is contained in:
@@ -94,8 +94,7 @@ def get_oldest_pending_db_row_with_lock(session):
|
|||||||
with session.begin():
|
with session.begin():
|
||||||
row = session.query(models.OpenDaylightJournal).filter_by(
|
row = session.query(models.OpenDaylightJournal).filter_by(
|
||||||
state=odl_const.PENDING).order_by(
|
state=odl_const.PENDING).order_by(
|
||||||
asc(models.OpenDaylightJournal.last_retried)).with_for_update(
|
asc(models.OpenDaylightJournal.last_retried)).first()
|
||||||
).first()
|
|
||||||
if row:
|
if row:
|
||||||
update_db_row_state(session, row, odl_const.PROCESSING)
|
update_db_row_state(session, row, odl_const.PROCESSING)
|
||||||
|
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
3d560427d776
|
43af357fd638
|
||||||
|
|||||||
@@ -0,0 +1,36 @@
|
|||||||
|
# Copyright (C) 2017 Red Hat Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
"""Added version_id for optimistic locking
|
||||||
|
|
||||||
|
Revision ID: 43af357fd638
|
||||||
|
Revises: 3d560427d776
|
||||||
|
Create Date: 2016-03-24 10:14:56.408413
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '43af357fd638'
|
||||||
|
down_revision = '3d560427d776'
|
||||||
|
depends_on = ('fa0c536252a5',)
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
op.add_column('opendaylightjournal',
|
||||||
|
sa.Column('version_id', sa.Integer, server_default='0',
|
||||||
|
nullable=False))
|
||||||
@@ -40,6 +40,11 @@ class OpenDaylightJournal(model_base.BASEV2):
|
|||||||
server_default=sa.func.now())
|
server_default=sa.func.now())
|
||||||
last_retried = sa.Column(sa.TIMESTAMP, server_default=sa.func.now(),
|
last_retried = sa.Column(sa.TIMESTAMP, server_default=sa.func.now(),
|
||||||
onupdate=sa.func.now())
|
onupdate=sa.func.now())
|
||||||
|
version_id = sa.Column(sa.Integer, server_default='0', nullable=False)
|
||||||
|
|
||||||
|
__mapper_args__ = {
|
||||||
|
'version_id_col': version_id
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class OpenDaylightMaintenance(model_base.BASEV2, model_base.HasId):
|
class OpenDaylightMaintenance(model_base.BASEV2, model_base.HasId):
|
||||||
|
|||||||
@@ -19,13 +19,13 @@ import operator
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
|
from sqlalchemy.orm import exc
|
||||||
|
|
||||||
from networking_odl.common import constants as odl_const
|
from networking_odl.common import constants as odl_const
|
||||||
from networking_odl.db import db
|
from networking_odl.db import db
|
||||||
from networking_odl.db import models
|
from networking_odl.db import models
|
||||||
from networking_odl.tests.unit import test_base_db
|
from networking_odl.tests.unit import test_base_db
|
||||||
|
|
||||||
from oslo_db.exception import DBDeadlock
|
|
||||||
|
|
||||||
|
|
||||||
class DbTestCase(test_base_db.ODLBaseDbTestCase):
|
class DbTestCase(test_base_db.ODLBaseDbTestCase):
|
||||||
|
|
||||||
@@ -144,9 +144,10 @@ class DbTestCase(test_base_db.ODLBaseDbTestCase):
|
|||||||
row = db.get_oldest_pending_db_row_with_lock(self.db_session)
|
row = db.get_oldest_pending_db_row_with_lock(self.db_session)
|
||||||
self.assertEqual(older_row, row)
|
self.assertEqual(older_row, row)
|
||||||
|
|
||||||
def test_get_oldest_pending_row_when_deadlock(self):
|
def test_get_oldest_pending_row_when_conflict(self):
|
||||||
db.create_pending_row(self.db_session, *self.UPDATE_ROW)
|
db.create_pending_row(self.db_session, *self.UPDATE_ROW)
|
||||||
update_mock = mock.MagicMock(side_effect=(DBDeadlock, mock.DEFAULT))
|
update_mock = mock.MagicMock(
|
||||||
|
side_effect=(exc.StaleDataError, mock.DEFAULT))
|
||||||
|
|
||||||
# Mocking is mandatory to achieve a deadlock regardless of the DB
|
# Mocking is mandatory to achieve a deadlock regardless of the DB
|
||||||
# backend being used when running the tests
|
# backend being used when running the tests
|
||||||
|
|||||||
Reference in New Issue
Block a user