Create journal dependencies table
The table will keep dependencies of journal entries upon one another, and will be utilized later on. Partially-Implements: blueprint dep-validations-on-create Change-Id: Ie39fde548016ecd20ae0f68d158c7e82b9dd3e8c
This commit is contained in:
@@ -17,6 +17,7 @@ import datetime
|
|||||||
from sqlalchemy import asc
|
from sqlalchemy import asc
|
||||||
from sqlalchemy import func
|
from sqlalchemy import func
|
||||||
from sqlalchemy import or_
|
from sqlalchemy import or_
|
||||||
|
from sqlalchemy.orm import aliased
|
||||||
|
|
||||||
from networking_odl.common import constants as odl_const
|
from networking_odl.common import constants as odl_const
|
||||||
from networking_odl.db import models
|
from networking_odl.db import models
|
||||||
@@ -92,8 +93,17 @@ def get_all_db_rows_by_state(session, state):
|
|||||||
@db_api.retry_db_errors
|
@db_api.retry_db_errors
|
||||||
def get_oldest_pending_db_row_with_lock(session):
|
def get_oldest_pending_db_row_with_lock(session):
|
||||||
with session.begin():
|
with session.begin():
|
||||||
row = session.query(models.OpenDaylightJournal).filter_by(
|
journal_dep = aliased(models.OpenDaylightJournal)
|
||||||
state=odl_const.PENDING).order_by(
|
dep_query = session.query(journal_dep).filter(
|
||||||
|
models.OpenDaylightJournal.seqnum == journal_dep.seqnum
|
||||||
|
).outerjoin(
|
||||||
|
journal_dep.depending_on, aliased=True).filter(
|
||||||
|
or_(models.OpenDaylightJournal.state == odl_const.PENDING,
|
||||||
|
models.OpenDaylightJournal.state == odl_const.PROCESSING))
|
||||||
|
row = session.query(models.OpenDaylightJournal).filter(
|
||||||
|
models.OpenDaylightJournal.state == odl_const.PENDING,
|
||||||
|
~ dep_query.exists()
|
||||||
|
).order_by(
|
||||||
asc(models.OpenDaylightJournal.last_retried)).first()
|
asc(models.OpenDaylightJournal.last_retried)).first()
|
||||||
if row:
|
if row:
|
||||||
update_db_row_state(session, row, odl_const.PROCESSING)
|
update_db_row_state(session, row, odl_const.PROCESSING)
|
||||||
@@ -101,6 +111,16 @@ def get_oldest_pending_db_row_with_lock(session):
|
|||||||
return row
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
def delete_dependency(session, entry):
|
||||||
|
"""Delete dependency upon the given ID"""
|
||||||
|
conn = session.connection()
|
||||||
|
stmt = models.journal_dependencies.delete(
|
||||||
|
models.journal_dependencies.c.depends_on == entry.seqnum)
|
||||||
|
conn.execute(stmt)
|
||||||
|
|
||||||
|
session.expire_all()
|
||||||
|
|
||||||
|
|
||||||
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
|
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
|
||||||
def update_db_row_state(session, row, state):
|
def update_db_row_state(session, row, state):
|
||||||
row.state = state
|
row.state = state
|
||||||
@@ -130,12 +150,15 @@ def delete_row(session, row=None, row_id=None):
|
|||||||
|
|
||||||
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
|
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
|
||||||
def create_pending_row(session, object_type, object_uuid,
|
def create_pending_row(session, object_type, object_uuid,
|
||||||
operation, data):
|
operation, data, depending_on=None):
|
||||||
|
if depending_on is None:
|
||||||
|
depending_on = []
|
||||||
row = models.OpenDaylightJournal(object_type=object_type,
|
row = models.OpenDaylightJournal(object_type=object_type,
|
||||||
object_uuid=object_uuid,
|
object_uuid=object_uuid,
|
||||||
operation=operation, data=data,
|
operation=operation, data=data,
|
||||||
created_at=func.now(),
|
created_at=func.now(),
|
||||||
state=odl_const.PENDING)
|
state=odl_const.PENDING,
|
||||||
|
depending_on=depending_on)
|
||||||
session.add(row)
|
session.add(row)
|
||||||
# Keep session flush for unit tests. NOOP for L2/L3 events since calls are
|
# Keep session flush for unit tests. NOOP for L2/L3 events since calls are
|
||||||
# made inside database session transaction with subtransactions=True.
|
# made inside database session transaction with subtransactions=True.
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
43af357fd638
|
0472f56ff2fb
|
||||||
|
|||||||
@@ -0,0 +1,42 @@
|
|||||||
|
# Copyright 2017 Red Hat Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
"""Add journal dependencies table
|
||||||
|
|
||||||
|
Revision ID: 0472f56ff2fb
|
||||||
|
Revises: 43af357fd638
|
||||||
|
Create Date: 2017-04-02 11:02:01.622548
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '0472f56ff2fb'
|
||||||
|
down_revision = '43af357fd638'
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
op.create_table(
|
||||||
|
'opendaylight_journal_deps',
|
||||||
|
sa.Column('depends_on', sa.BigInteger(),
|
||||||
|
sa.ForeignKey('opendaylightjournal.seqnum',
|
||||||
|
ondelete='CASCADE'),
|
||||||
|
primary_key=True),
|
||||||
|
sa.Column('dependent', sa.BigInteger(),
|
||||||
|
sa.ForeignKey('opendaylightjournal.seqnum',
|
||||||
|
ondelete='CASCADE'),
|
||||||
|
primary_key=True))
|
||||||
@@ -21,11 +21,22 @@ from neutron_lib.db import model_base
|
|||||||
from networking_odl.common import constants as odl_const
|
from networking_odl.common import constants as odl_const
|
||||||
|
|
||||||
|
|
||||||
|
IdType = sa.BigInteger().with_variant(sa.Integer(), 'sqlite')
|
||||||
|
|
||||||
|
journal_dependencies = sa.Table(
|
||||||
|
'opendaylight_journal_deps', model_base.BASEV2.metadata,
|
||||||
|
sa.Column('depends_on', IdType,
|
||||||
|
sa.ForeignKey('opendaylightjournal.seqnum', ondelete='CASCADE'),
|
||||||
|
primary_key=True),
|
||||||
|
sa.Column('dependent', IdType,
|
||||||
|
sa.ForeignKey('opendaylightjournal.seqnum', ondelete='CASCADE'),
|
||||||
|
primary_key=True))
|
||||||
|
|
||||||
|
|
||||||
class OpenDaylightJournal(model_base.BASEV2):
|
class OpenDaylightJournal(model_base.BASEV2):
|
||||||
__tablename__ = 'opendaylightjournal'
|
__tablename__ = 'opendaylightjournal'
|
||||||
|
|
||||||
seqnum = sa.Column(sa.BigInteger().with_variant(sa.Integer(), 'sqlite'),
|
seqnum = sa.Column(IdType, primary_key=True, autoincrement=True)
|
||||||
primary_key=True, autoincrement=True)
|
|
||||||
object_type = sa.Column(sa.String(36), nullable=False)
|
object_type = sa.Column(sa.String(36), nullable=False)
|
||||||
object_uuid = sa.Column(sa.String(36), nullable=False)
|
object_uuid = sa.Column(sa.String(36), nullable=False)
|
||||||
operation = sa.Column(sa.String(36), nullable=False)
|
operation = sa.Column(sa.String(36), nullable=False)
|
||||||
@@ -41,6 +52,12 @@ class OpenDaylightJournal(model_base.BASEV2):
|
|||||||
last_retried = sa.Column(sa.TIMESTAMP, server_default=sa.func.now(),
|
last_retried = sa.Column(sa.TIMESTAMP, server_default=sa.func.now(),
|
||||||
onupdate=sa.func.now())
|
onupdate=sa.func.now())
|
||||||
version_id = sa.Column(sa.Integer, server_default='0', nullable=False)
|
version_id = sa.Column(sa.Integer, server_default='0', nullable=False)
|
||||||
|
dependencies = sa.orm.relationship(
|
||||||
|
"OpenDaylightJournal", secondary=journal_dependencies,
|
||||||
|
primaryjoin=seqnum == journal_dependencies.c.depends_on,
|
||||||
|
secondaryjoin=seqnum == journal_dependencies.c.dependent,
|
||||||
|
backref="depending_on"
|
||||||
|
)
|
||||||
|
|
||||||
__mapper_args__ = {
|
__mapper_args__ = {
|
||||||
'version_id_col': version_id
|
'version_id_col': version_id
|
||||||
|
|||||||
@@ -157,6 +157,38 @@ class DbTestCase(test_base_db.ODLBaseDbTestCase):
|
|||||||
|
|
||||||
self.assertEqual(2, update_mock.call_count)
|
self.assertEqual(2, update_mock.call_count)
|
||||||
|
|
||||||
|
def _test_get_oldest_pending_row_with_dep(self, dep_state):
|
||||||
|
db.create_pending_row(self.db_session, *self.UPDATE_ROW)
|
||||||
|
parent_row = db.get_all_db_rows(self.db_session)[0]
|
||||||
|
db.update_db_row_state(self.db_session, parent_row, dep_state)
|
||||||
|
db.create_pending_row(self.db_session, *self.UPDATE_ROW,
|
||||||
|
depending_on=[parent_row])
|
||||||
|
row = db.get_oldest_pending_db_row_with_lock(self.db_session)
|
||||||
|
if row is not None:
|
||||||
|
self.assertNotEqual(parent_row.seqnum, row.seqnum)
|
||||||
|
|
||||||
|
return row
|
||||||
|
|
||||||
|
def test_get_oldest_pending_row_when_dep_completed(self):
|
||||||
|
row = self._test_get_oldest_pending_row_with_dep(odl_const.COMPLETED)
|
||||||
|
self.assertEqual(odl_const.PROCESSING, row.state)
|
||||||
|
|
||||||
|
def test_get_oldest_pending_row_when_dep_failed(self):
|
||||||
|
row = self._test_get_oldest_pending_row_with_dep(odl_const.FAILED)
|
||||||
|
self.assertEqual(odl_const.PROCESSING, row.state)
|
||||||
|
|
||||||
|
def test_get_oldest_pending_row_returns_parent_when_dep_pending(self):
|
||||||
|
db.create_pending_row(self.db_session, *self.UPDATE_ROW)
|
||||||
|
parent_row = db.get_all_db_rows(self.db_session)[0]
|
||||||
|
db.create_pending_row(self.db_session, *self.UPDATE_ROW,
|
||||||
|
depending_on=[parent_row])
|
||||||
|
row = db.get_oldest_pending_db_row_with_lock(self.db_session)
|
||||||
|
self.assertEqual(parent_row, row)
|
||||||
|
|
||||||
|
def test_get_oldest_pending_row_none_when_dep_processing(self):
|
||||||
|
row = self._test_get_oldest_pending_row_with_dep(odl_const.PROCESSING)
|
||||||
|
self.assertIsNone(row)
|
||||||
|
|
||||||
def _test_delete_rows_by_state_and_time(self, last_retried, row_retention,
|
def _test_delete_rows_by_state_and_time(self, last_retried, row_retention,
|
||||||
state, expected_rows):
|
state, expected_rows):
|
||||||
db.create_pending_row(self.db_session, *self.UPDATE_ROW)
|
db.create_pending_row(self.db_session, *self.UPDATE_ROW)
|
||||||
|
|||||||
Reference in New Issue
Block a user