Merge "Fix a race condition in add_tunnel_endpoint()"

This commit is contained in:
Jenkins 2013-08-29 05:06:14 +00:00 committed by Gerrit Code Review
commit aacd1c3d20
4 changed files with 128 additions and 12 deletions

View File

@ -0,0 +1,64 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Add unique constraint for id column of TunnelEndpoint
Revision ID: 63afba73813
Revises: 3c6e57a23db4
Create Date: 2013-04-30 13:53:31.717450
"""
# revision identifiers, used by Alembic.
revision = '63afba73813'
down_revision = '3c6e57a23db4'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'neutron.plugins.openvswitch.ovs_neutron_plugin.OVSNeutronPluginV2',
]
from alembic import op
from neutron.db import migration
CONSTRAINT_NAME = 'uniq_ovs_tunnel_endpoints0id'
TABLE_NAME = 'ovs_tunnel_endpoints'
def upgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.create_unique_constraint(
name=CONSTRAINT_NAME,
source=TABLE_NAME,
local_cols=['id']
)
def downgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.drop_constraint(
name=CONSTRAINT_NAME,
tablename=TABLE_NAME,
type='unique'
)

View File

@ -16,8 +16,8 @@
# @author: Aaron Rosen, Nicira Networks, Inc.
# @author: Bob Kukura, Red Hat, Inc.
from sqlalchemy import func
from sqlalchemy.orm import exc
from sqlalchemy.sql import func
from neutron.common import exceptions as q_exc
import neutron.db.api as db
@ -25,6 +25,7 @@ from neutron.db import models_v2
from neutron.db import securitygroups_db as sg_db
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
from neutron.openstack.common.db import exception as db_exc
from neutron.openstack.common import log as logging
from neutron.plugins.openvswitch.common import constants
from neutron.plugins.openvswitch import ovs_models_v2
@ -367,14 +368,33 @@ def _generate_tunnel_id(session):
return max_tunnel_id + 1
def add_tunnel_endpoint(ip):
session = db.get_session()
try:
tunnel = (session.query(ovs_models_v2.TunnelEndpoint).
filter_by(ip_address=ip).with_lockmode('update').one())
except exc.NoResultFound:
tunnel_id = _generate_tunnel_id(session)
tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
session.add(tunnel)
session.flush()
return tunnel
def add_tunnel_endpoint(ip, max_retries=10):
"""Return the endpoint of the given IP address or generate a new one."""
# NOTE(rpodolyaka): generation of a new tunnel endpoint must be put into a
# repeatedly executed transactional block to ensure it
# doesn't conflict with any other concurrently executed
# DB transactions in spite of the specified transactions
# isolation level value
for i in xrange(max_retries):
LOG.debug(_('Adding a tunnel endpoint for %s'), ip)
try:
session = db.get_session()
with session.begin(subtransactions=True):
tunnel = (session.query(ovs_models_v2.TunnelEndpoint).
filter_by(ip_address=ip).with_lockmode('update').
first())
if tunnel is None:
tunnel_id = _generate_tunnel_id(session)
tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
session.add(tunnel)
return tunnel
except db_exc.DBDuplicateEntry:
# a concurrent transaction has been commited, try again
LOG.debug(_('Adding a tunnel endpoint failed due to a concurrent'
'transaction had been commited (%s attempts left)'),
max_retries - (i + 1))
raise q_exc.NeutronException(message='Unable to generate a new tunnel id')

View File

@ -18,6 +18,7 @@
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.schema import UniqueConstraint
from neutron.db.models_v2 import model_base
@ -86,6 +87,9 @@ class NetworkBinding(model_base.BASEV2):
class TunnelEndpoint(model_base.BASEV2):
"""Represents tunnel endpoint in RPC mode."""
__tablename__ = 'ovs_tunnel_endpoints'
__table_args__ = (
UniqueConstraint('id', name='uniq_ovs_tunnel_endpoints0id'),
)
ip_address = Column(String(64), primary_key=True)
id = Column(Integer, nullable=False)

View File

@ -13,12 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from testtools import matchers
from neutron.common import exceptions as q_exc
from neutron.db import api as db
from neutron.openstack.common.db import exception as db_exc
from neutron.openstack.common.db.sqlalchemy import session
from neutron.plugins.openvswitch import ovs_db_v2
from neutron.plugins.openvswitch import ovs_models_v2 as ovs_models
from neutron.tests import base
from neutron.tests.unit import test_db_plugin as test_plugin
@ -262,6 +266,30 @@ class TunnelAllocationsTest(base.BaseTestCase):
ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES)
self.assertIsNone(ovs_db_v2.get_tunnel_allocation(tunnel_id))
def test_add_tunnel_endpoint_create_new_endpoint(self):
addr = '10.0.0.1'
ovs_db_v2.add_tunnel_endpoint(addr)
self.assertIsNotNone(self.session.query(ovs_models.TunnelEndpoint).
filter_by(ip_address=addr).first())
def test_add_tunnel_endpoint_retrieve_an_existing_endpoint(self):
addr = '10.0.0.1'
self.session.add(ovs_models.TunnelEndpoint(ip_address=addr, id=1))
self.session.flush()
tunnel = ovs_db_v2.add_tunnel_endpoint(addr)
self.assertEquals(tunnel.id, 1)
self.assertEquals(tunnel.ip_address, addr)
def test_add_tunnel_endpoint_handle_duplicate_error(self):
with mock.patch.object(session.Session, 'query') as query_mock:
error = db_exc.DBDuplicateEntry(['id'])
query_mock.side_effect = error
with testtools.ExpectedException(q_exc.NeutronException):
ovs_db_v2.add_tunnel_endpoint('10.0.0.1', 5)
self.assertEquals(query_mock.call_count, 5)
class NetworkBindingsTest(test_plugin.NeutronDbPluginV2TestCase):
def setUp(self):