From 00490a04be5d0af2e353cc29f0b7ef88bf337487 Mon Sep 17 00:00:00 2001 From: Mike Kolesnik Date: Thu, 30 Mar 2017 11:36:11 +0300 Subject: [PATCH] Create journal dependencies table The table will keep dependencies of journal entries upon one another, and will be utilized later on. Partially-Implements: blueprint dep-validations-on-create Change-Id: Ie39fde548016ecd20ae0f68d158c7e82b9dd3e8c --- networking_odl/db/db.py | 31 ++++++++++++-- .../alembic_migrations/versions/EXPAND_HEAD | 2 +- ...f56ff2fb_add_journal_dependencies_table.py | 42 +++++++++++++++++++ networking_odl/db/models.py | 21 +++++++++- networking_odl/tests/unit/db/test_db.py | 32 ++++++++++++++ 5 files changed, 121 insertions(+), 7 deletions(-) create mode 100644 networking_odl/db/migration/alembic_migrations/versions/pike/expand/0472f56ff2fb_add_journal_dependencies_table.py diff --git a/networking_odl/db/db.py b/networking_odl/db/db.py index e920b42ad..4db8d8e5c 100644 --- a/networking_odl/db/db.py +++ b/networking_odl/db/db.py @@ -17,6 +17,7 @@ import datetime from sqlalchemy import asc from sqlalchemy import func from sqlalchemy import or_ +from sqlalchemy.orm import aliased from networking_odl.common import constants as odl_const from networking_odl.db import models @@ -92,8 +93,17 @@ def get_all_db_rows_by_state(session, state): @db_api.retry_db_errors def get_oldest_pending_db_row_with_lock(session): with session.begin(): - row = session.query(models.OpenDaylightJournal).filter_by( - state=odl_const.PENDING).order_by( + journal_dep = aliased(models.OpenDaylightJournal) + dep_query = session.query(journal_dep).filter( + models.OpenDaylightJournal.seqnum == journal_dep.seqnum + ).outerjoin( + journal_dep.depending_on, aliased=True).filter( + or_(models.OpenDaylightJournal.state == odl_const.PENDING, + models.OpenDaylightJournal.state == odl_const.PROCESSING)) + row = session.query(models.OpenDaylightJournal).filter( + models.OpenDaylightJournal.state == odl_const.PENDING, + ~ dep_query.exists() + ).order_by( asc(models.OpenDaylightJournal.last_retried)).first() if row: update_db_row_state(session, row, odl_const.PROCESSING) @@ -101,6 +111,16 @@ def get_oldest_pending_db_row_with_lock(session): return row +def delete_dependency(session, entry): + """Delete dependency upon the given ID""" + conn = session.connection() + stmt = models.journal_dependencies.delete( + models.journal_dependencies.c.depends_on == entry.seqnum) + conn.execute(stmt) + + session.expire_all() + + @oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES) def update_db_row_state(session, row, state): row.state = state @@ -130,12 +150,15 @@ def delete_row(session, row=None, row_id=None): @oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES) def create_pending_row(session, object_type, object_uuid, - operation, data): + operation, data, depending_on=None): + if depending_on is None: + depending_on = [] row = models.OpenDaylightJournal(object_type=object_type, object_uuid=object_uuid, operation=operation, data=data, created_at=func.now(), - state=odl_const.PENDING) + state=odl_const.PENDING, + depending_on=depending_on) session.add(row) # Keep session flush for unit tests. NOOP for L2/L3 events since calls are # made inside database session transaction with subtransactions=True. diff --git a/networking_odl/db/migration/alembic_migrations/versions/EXPAND_HEAD b/networking_odl/db/migration/alembic_migrations/versions/EXPAND_HEAD index 242701896..ec499d0fd 100644 --- a/networking_odl/db/migration/alembic_migrations/versions/EXPAND_HEAD +++ b/networking_odl/db/migration/alembic_migrations/versions/EXPAND_HEAD @@ -1 +1 @@ -43af357fd638 +0472f56ff2fb diff --git a/networking_odl/db/migration/alembic_migrations/versions/pike/expand/0472f56ff2fb_add_journal_dependencies_table.py b/networking_odl/db/migration/alembic_migrations/versions/pike/expand/0472f56ff2fb_add_journal_dependencies_table.py new file mode 100644 index 000000000..acf632731 --- /dev/null +++ b/networking_odl/db/migration/alembic_migrations/versions/pike/expand/0472f56ff2fb_add_journal_dependencies_table.py @@ -0,0 +1,42 @@ +# Copyright 2017 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Add journal dependencies table + +Revision ID: 0472f56ff2fb +Revises: 43af357fd638 +Create Date: 2017-04-02 11:02:01.622548 + +""" + +# revision identifiers, used by Alembic. +revision = '0472f56ff2fb' +down_revision = '43af357fd638' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'opendaylight_journal_deps', + sa.Column('depends_on', sa.BigInteger(), + sa.ForeignKey('opendaylightjournal.seqnum', + ondelete='CASCADE'), + primary_key=True), + sa.Column('dependent', sa.BigInteger(), + sa.ForeignKey('opendaylightjournal.seqnum', + ondelete='CASCADE'), + primary_key=True)) diff --git a/networking_odl/db/models.py b/networking_odl/db/models.py index 094cc44c9..3c883e8aa 100644 --- a/networking_odl/db/models.py +++ b/networking_odl/db/models.py @@ -21,11 +21,22 @@ from neutron_lib.db import model_base from networking_odl.common import constants as odl_const +IdType = sa.BigInteger().with_variant(sa.Integer(), 'sqlite') + +journal_dependencies = sa.Table( + 'opendaylight_journal_deps', model_base.BASEV2.metadata, + sa.Column('depends_on', IdType, + sa.ForeignKey('opendaylightjournal.seqnum', ondelete='CASCADE'), + primary_key=True), + sa.Column('dependent', IdType, + sa.ForeignKey('opendaylightjournal.seqnum', ondelete='CASCADE'), + primary_key=True)) + + class OpenDaylightJournal(model_base.BASEV2): __tablename__ = 'opendaylightjournal' - seqnum = sa.Column(sa.BigInteger().with_variant(sa.Integer(), 'sqlite'), - primary_key=True, autoincrement=True) + seqnum = sa.Column(IdType, primary_key=True, autoincrement=True) object_type = sa.Column(sa.String(36), nullable=False) object_uuid = sa.Column(sa.String(36), nullable=False) operation = sa.Column(sa.String(36), nullable=False) @@ -41,6 +52,12 @@ class OpenDaylightJournal(model_base.BASEV2): last_retried = sa.Column(sa.TIMESTAMP, server_default=sa.func.now(), onupdate=sa.func.now()) version_id = sa.Column(sa.Integer, server_default='0', nullable=False) + dependencies = sa.orm.relationship( + "OpenDaylightJournal", secondary=journal_dependencies, + primaryjoin=seqnum == journal_dependencies.c.depends_on, + secondaryjoin=seqnum == journal_dependencies.c.dependent, + backref="depending_on" + ) __mapper_args__ = { 'version_id_col': version_id diff --git a/networking_odl/tests/unit/db/test_db.py b/networking_odl/tests/unit/db/test_db.py index 89ff2faaf..e2e647be3 100644 --- a/networking_odl/tests/unit/db/test_db.py +++ b/networking_odl/tests/unit/db/test_db.py @@ -157,6 +157,38 @@ class DbTestCase(test_base_db.ODLBaseDbTestCase): self.assertEqual(2, update_mock.call_count) + def _test_get_oldest_pending_row_with_dep(self, dep_state): + db.create_pending_row(self.db_session, *self.UPDATE_ROW) + parent_row = db.get_all_db_rows(self.db_session)[0] + db.update_db_row_state(self.db_session, parent_row, dep_state) + db.create_pending_row(self.db_session, *self.UPDATE_ROW, + depending_on=[parent_row]) + row = db.get_oldest_pending_db_row_with_lock(self.db_session) + if row is not None: + self.assertNotEqual(parent_row.seqnum, row.seqnum) + + return row + + def test_get_oldest_pending_row_when_dep_completed(self): + row = self._test_get_oldest_pending_row_with_dep(odl_const.COMPLETED) + self.assertEqual(odl_const.PROCESSING, row.state) + + def test_get_oldest_pending_row_when_dep_failed(self): + row = self._test_get_oldest_pending_row_with_dep(odl_const.FAILED) + self.assertEqual(odl_const.PROCESSING, row.state) + + def test_get_oldest_pending_row_returns_parent_when_dep_pending(self): + db.create_pending_row(self.db_session, *self.UPDATE_ROW) + parent_row = db.get_all_db_rows(self.db_session)[0] + db.create_pending_row(self.db_session, *self.UPDATE_ROW, + depending_on=[parent_row]) + row = db.get_oldest_pending_db_row_with_lock(self.db_session) + self.assertEqual(parent_row, row) + + def test_get_oldest_pending_row_none_when_dep_processing(self): + row = self._test_get_oldest_pending_row_with_dep(odl_const.PROCESSING) + self.assertIsNone(row) + def _test_delete_rows_by_state_and_time(self, last_retried, row_retention, state, expected_rows): db.create_pending_row(self.db_session, *self.UPDATE_ROW)