Move ovsdb_nested transaction to ovs_lib

The patch introduces new abstract method to the API abstract class. The
method is supposed to return a new Transaction object. Each
API object is capable of store one nested transaction which is returned
by context manager in case some transaction already exists.

As there are no projects in OpenStack that use inheritance directly from
API abstract class, it's safe to make this new create_transaction() abstract method.
Only projects that currenlty use ovsdb API are networking-ovn,
dragonflow and networking-l2gw. OVN and Dragonflow use only IDL
implementation and L2GW copies the code of API abstract class.

Closes-bug 1653517

Change-Id: I55dd417cae7ebbe0668ba5606949ce4ab045d251
This commit is contained in:
Jakub Libosvar 2017-01-02 09:59:35 -05:00 committed by Ihar Hrachyshka
parent 0e71248955
commit acfbd2d490
6 changed files with 168 additions and 74 deletions

View File

@ -14,6 +14,7 @@
import abc import abc
import collections import collections
import contextlib
import uuid import uuid
from oslo_config import cfg from oslo_config import cfg
@ -83,6 +84,7 @@ class Transaction(object):
class API(object): class API(object):
def __init__(self, context): def __init__(self, context):
self.context = context self.context = context
self._nested_txn = None
@staticmethod @staticmethod
def get(context, iface_name=None): def get(context, iface_name=None):
@ -92,7 +94,7 @@ class API(object):
return iface(context) return iface(context)
@abc.abstractmethod @abc.abstractmethod
def transaction(self, check_error=False, log_errors=True, **kwargs): def create_transaction(self, check_error=False, log_errors=True, **kwargs):
"""Create a transaction """Create a transaction
:param check_error: Allow the transaction to raise an exception? :param check_error: Allow the transaction to raise an exception?
@ -103,6 +105,28 @@ class API(object):
:rtype: :class:`Transaction` :rtype: :class:`Transaction`
""" """
@contextlib.contextmanager
def transaction(self, check_error=False, log_errors=True, **kwargs):
"""Create a transaction context.
:param check_error: Allow the transaction to raise an exception?
:type check_error: bool
:param log_errors: Log an error if the transaction fails?
:type log_errors: bool
:returns: Either a new transaction or an existing one.
:rtype: :class:`Transaction`
"""
if self._nested_txn:
yield self._nested_txn
else:
with self.create_transaction(
check_error, log_errors, **kwargs) as txn:
self._nested_txn = txn
try:
yield txn
finally:
self._nested_txn = None
@abc.abstractmethod @abc.abstractmethod
def add_manager(self, connection_uri): def add_manager(self, connection_uri):
"""Create a command to add a Manager to the OVS switch """Create a command to add a Manager to the OVS switch

View File

@ -207,7 +207,7 @@ class OvsdbIdl(api.API):
def _ovs(self): def _ovs(self):
return list(self._tables['Open_vSwitch'].rows.values())[0] return list(self._tables['Open_vSwitch'].rows.values())[0]
def transaction(self, check_error=False, log_errors=True, **kwargs): def create_transaction(self, check_error=False, log_errors=True, **kwargs):
return NeutronOVSDBTransaction(self, OvsdbIdl.ovsdb_connection, return NeutronOVSDBTransaction(self, OvsdbIdl.ovsdb_connection,
self.context.vsctl_timeout, self.context.vsctl_timeout,
check_error, log_errors) check_error, log_errors)

View File

@ -178,7 +178,7 @@ class BrExistsCommand(DbCommand):
class OvsdbVsctl(ovsdb.API): class OvsdbVsctl(ovsdb.API):
def transaction(self, check_error=False, log_errors=True, **kwargs): def create_transaction(self, check_error=False, log_errors=True, **kwargs):
return Transaction(self.context, check_error, log_errors, **kwargs) return Transaction(self.context, check_error, log_errors, **kwargs)
def add_manager(self, connection_uri): def add_manager(self, connection_uri):

View File

@ -10,8 +10,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib
from neutron_lib import constants from neutron_lib import constants
from neutron_lib import exceptions from neutron_lib import exceptions
from oslo_log import log as logging from oslo_log import log as logging
@ -93,25 +91,6 @@ class TrunkParentPort(object):
self.DEV_PREFIX, port_id) self.DEV_PREFIX, port_id)
self._transaction = None self._transaction = None
# TODO(jlibosva): Move nested transaction to ovs_lib
@contextlib.contextmanager
def ovsdb_transaction(self):
"""Context manager for ovsdb transaction.
The object caches whether its already in transaction and if it is, the
original transaction is returned. This behavior enables calling
manager several times while always getting the same transaction.
"""
if self._transaction:
yield self._transaction
else:
with self.bridge.ovsdb.transaction() as txn:
self._transaction = txn
try:
yield txn
finally:
self._transaction = None
def plug(self, br_int): def plug(self, br_int):
"""Plug patch ports between trunk bridge and given bridge. """Plug patch ports between trunk bridge and given bridge.
@ -136,7 +115,7 @@ class TrunkParentPort(object):
patch_trunk_attrs = get_patch_peer_attrs(self.patch_port_int_name, patch_trunk_attrs = get_patch_peer_attrs(self.patch_port_int_name,
self.port_mac, self.port_id) self.port_mac, self.port_id)
with self.ovsdb_transaction() as txn: with ovsdb.transaction() as txn:
txn.add(ovsdb.add_port(br_int.br_name, txn.add(ovsdb.add_port(br_int.br_name,
self.patch_port_int_name)) self.patch_port_int_name))
txn.add(ovsdb.db_set('Interface', self.patch_port_int_name, txn.add(ovsdb.db_set('Interface', self.patch_port_int_name,
@ -156,7 +135,7 @@ class TrunkParentPort(object):
subport. subport.
""" """
ovsdb = self.bridge.ovsdb ovsdb = self.bridge.ovsdb
with self.ovsdb_transaction() as txn: with ovsdb.transaction() as txn:
txn.add(ovsdb.del_br(self.bridge.br_name)) txn.add(ovsdb.del_br(self.bridge.br_name))
txn.add(ovsdb.del_port(self.patch_port_int_name, txn.add(ovsdb.del_port(self.patch_port_int_name,
bridge.br_name)) bridge.br_name))
@ -190,7 +169,7 @@ class SubPort(TrunkParentPort):
will be created. will be created.
""" """
ovsdb = self.bridge.ovsdb ovsdb = self.bridge.ovsdb
with self.ovsdb_transaction() as txn: with ovsdb.transaction() as txn:
super(SubPort, self).plug(br_int) super(SubPort, self).plug(br_int)
txn.add(ovsdb.db_set( txn.add(ovsdb.db_set(
"Port", self.patch_port_trunk_name, "Port", self.patch_port_trunk_name,
@ -206,7 +185,7 @@ class SubPort(TrunkParentPort):
subport. subport.
""" """
ovsdb = self.bridge.ovsdb ovsdb = self.bridge.ovsdb
with self.ovsdb_transaction() as txn: with ovsdb.transaction() as txn:
txn.add(ovsdb.del_port(self.patch_port_trunk_name, txn.add(ovsdb.del_port(self.patch_port_trunk_name,
self.bridge.br_name)) self.bridge.br_name))
txn.add(ovsdb.del_port(self.patch_port_int_name, txn.add(ovsdb.del_port(self.patch_port_int_name,

View File

@ -0,0 +1,137 @@
# Copyright (c) 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from neutron.agent.ovsdb import api
from neutron.tests import base
class FakeTransaction(object):
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, tb):
self.commit()
def commit(self):
"""Serves just for mock."""
class TestingAPI(api.API):
def create_transaction(self, check_error=False, log_errors=True, **kwargs):
return FakeTransaction()
def add_manager(self, connection_uri):
pass
def get_manager(self):
pass
def remove_manager(self, connection_uri):
pass
def add_br(self, name, may_exist=True, datapath_type=None):
pass
def del_br(self, name, if_exists=True):
pass
def br_exists(self, name):
pass
def port_to_br(self, name):
pass
def iface_to_br(self, name):
pass
def list_br(self):
pass
def br_get_external_id(self, name, field):
pass
def db_create(self, table, **col_values):
pass
def db_destroy(self, table, record):
pass
def db_set(self, table, record, *col_values):
pass
def db_add(self, table, record, column, *values):
pass
def db_clear(self, table, record, column):
pass
def db_get(self, table, record, column):
pass
def db_list(self, table, records=None, columns=None, if_exists=False):
pass
def db_find(self, table, *conditions, **kwargs):
pass
def set_controller(self, bridge, controllers):
pass
def del_controller(self, bridge):
pass
def get_controller(self, bridge):
pass
def set_fail_mode(self, bridge, mode):
pass
def add_port(self, bridge, port, may_exist=True):
pass
def del_port(self, port, bridge=None, if_exists=True):
pass
def list_ports(self, bridge):
pass
def list_ifaces(self, bridge):
pass
class TransactionTestCase(base.BaseTestCase):
def setUp(self):
super(TransactionTestCase, self).setUp()
self.api = TestingAPI(None)
mock.patch.object(FakeTransaction, 'commit').start()
def test_transaction_nested(self):
with self.api.transaction() as txn1:
with self.api.transaction() as txn2:
self.assertIs(txn1, txn2)
txn1.commit.assert_called_once_with()
def test_transaction_no_nested_transaction_after_error(self):
class TestException(Exception):
pass
with testtools.ExpectedException(TestException):
with self.api.transaction() as txn1:
raise TestException()
with self.api.transaction() as txn2:
self.assertIsNot(txn1, txn2)

View File

@ -16,57 +16,11 @@
import contextlib import contextlib
import mock import mock
from oslo_utils import uuidutils
import testtools import testtools
from neutron.common import utils as common_utils
from neutron.services.trunk.drivers.openvswitch.agent import trunk_manager from neutron.services.trunk.drivers.openvswitch.agent import trunk_manager
from neutron.tests import base from neutron.tests import base
NATIVE_OVSDB_CONNECTION = (
'neutron.agent.ovsdb.impl_idl.OvsdbIdl.ovsdb_connection')
class TrunkParentPortTestCase(base.BaseTestCase):
def setUp(self):
super(TrunkParentPortTestCase, self).setUp()
# Mock out connecting to ovsdb
mock.patch(NATIVE_OVSDB_CONNECTION).start()
trunk_id = uuidutils.generate_uuid()
port_id = uuidutils.generate_uuid()
trunk_mac = common_utils.get_random_mac('fa:16:3e:00:00:00'.split(':'))
self.trunk = trunk_manager.TrunkParentPort(
trunk_id, port_id, trunk_mac)
def test_multiple_transactions(self):
def method_inner(trunk):
with trunk.ovsdb_transaction() as txn:
return id(txn)
def method_outer(trunk):
with trunk.ovsdb_transaction() as txn:
return method_inner(trunk), id(txn)
with self.trunk.ovsdb_transaction() as txn1:
mock_commit = mock.patch.object(txn1, 'commit').start()
txn_inner_id, txn_outer_id = method_outer(self.trunk)
self.assertFalse(mock_commit.called)
self.assertTrue(mock_commit.called)
self.assertTrue(id(txn1) == txn_inner_id == txn_outer_id)
def test_transaction_raises_error(self):
class MyException(Exception):
pass
with testtools.ExpectedException(MyException):
with self.trunk.ovsdb_transaction() as txn1:
mock.patch.object(txn1, 'commit').start()
raise MyException()
self.assertIsNone(self.trunk._transaction)
with self.trunk.ovsdb_transaction() as txn2:
mock.patch.object(txn2, 'commit').start()
self.assertIsNot(txn1, txn2)
class TrunkManagerTestCase(base.BaseTestCase): class TrunkManagerTestCase(base.BaseTestCase):
"""Tests are aimed to cover negative cases to make sure there is no typo in """Tests are aimed to cover negative cases to make sure there is no typo in