diff --git a/neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py b/neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py new file mode 100644 index 0000000000..6ab361d8a9 --- /dev/null +++ b/neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py @@ -0,0 +1,64 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Add unique constraint for id column of TunnelEndpoint + +Revision ID: 63afba73813 +Revises: 3c6e57a23db4 +Create Date: 2013-04-30 13:53:31.717450 + +""" + +# revision identifiers, used by Alembic. +revision = '63afba73813' +down_revision = '3c6e57a23db4' + +# Change to ['*'] if this migration applies to all plugins + +migration_for_plugins = [ + 'neutron.plugins.openvswitch.ovs_neutron_plugin.OVSNeutronPluginV2', +] + +from alembic import op + +from neutron.db import migration + + +CONSTRAINT_NAME = 'uniq_ovs_tunnel_endpoints0id' +TABLE_NAME = 'ovs_tunnel_endpoints' + + +def upgrade(active_plugins=None, options=None): + if not migration.should_run(active_plugins, migration_for_plugins): + return + + op.create_unique_constraint( + name=CONSTRAINT_NAME, + source=TABLE_NAME, + local_cols=['id'] + ) + + +def downgrade(active_plugins=None, options=None): + if not migration.should_run(active_plugins, migration_for_plugins): + return + + op.drop_constraint( + name=CONSTRAINT_NAME, + tablename=TABLE_NAME, + type='unique' + ) diff --git a/neutron/plugins/openvswitch/ovs_db_v2.py b/neutron/plugins/openvswitch/ovs_db_v2.py index 27b9032e40..4eb9015e21 100644 --- a/neutron/plugins/openvswitch/ovs_db_v2.py +++ b/neutron/plugins/openvswitch/ovs_db_v2.py @@ -16,8 +16,8 @@ # @author: Aaron Rosen, Nicira Networks, Inc. # @author: Bob Kukura, Red Hat, Inc. +from sqlalchemy import func from sqlalchemy.orm import exc -from sqlalchemy.sql import func from neutron.common import exceptions as q_exc import neutron.db.api as db @@ -25,6 +25,7 @@ from neutron.db import models_v2 from neutron.db import securitygroups_db as sg_db from neutron.extensions import securitygroup as ext_sg from neutron import manager +from neutron.openstack.common.db import exception as db_exc from neutron.openstack.common import log as logging from neutron.plugins.openvswitch.common import constants from neutron.plugins.openvswitch import ovs_models_v2 @@ -367,14 +368,33 @@ def _generate_tunnel_id(session): return max_tunnel_id + 1 -def add_tunnel_endpoint(ip): - session = db.get_session() - try: - tunnel = (session.query(ovs_models_v2.TunnelEndpoint). - filter_by(ip_address=ip).with_lockmode('update').one()) - except exc.NoResultFound: - tunnel_id = _generate_tunnel_id(session) - tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id) - session.add(tunnel) - session.flush() - return tunnel +def add_tunnel_endpoint(ip, max_retries=10): + """Return the endpoint of the given IP address or generate a new one.""" + + # NOTE(rpodolyaka): generation of a new tunnel endpoint must be put into a + # repeatedly executed transactional block to ensure it + # doesn't conflict with any other concurrently executed + # DB transactions in spite of the specified transactions + # isolation level value + for i in xrange(max_retries): + LOG.debug(_('Adding a tunnel endpoint for %s'), ip) + try: + session = db.get_session() + with session.begin(subtransactions=True): + tunnel = (session.query(ovs_models_v2.TunnelEndpoint). + filter_by(ip_address=ip).with_lockmode('update'). + first()) + + if tunnel is None: + tunnel_id = _generate_tunnel_id(session) + tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id) + session.add(tunnel) + + return tunnel + except db_exc.DBDuplicateEntry: + # a concurrent transaction has been commited, try again + LOG.debug(_('Adding a tunnel endpoint failed due to a concurrent' + 'transaction had been commited (%s attempts left)'), + max_retries - (i + 1)) + + raise q_exc.NeutronException(message='Unable to generate a new tunnel id') diff --git a/neutron/plugins/openvswitch/ovs_models_v2.py b/neutron/plugins/openvswitch/ovs_models_v2.py index 77a40a5ed4..3ca34f1c20 100644 --- a/neutron/plugins/openvswitch/ovs_models_v2.py +++ b/neutron/plugins/openvswitch/ovs_models_v2.py @@ -18,6 +18,7 @@ from sqlalchemy import Boolean, Column, ForeignKey, Integer, String +from sqlalchemy.schema import UniqueConstraint from neutron.db.models_v2 import model_base @@ -86,6 +87,9 @@ class NetworkBinding(model_base.BASEV2): class TunnelEndpoint(model_base.BASEV2): """Represents tunnel endpoint in RPC mode.""" __tablename__ = 'ovs_tunnel_endpoints' + __table_args__ = ( + UniqueConstraint('id', name='uniq_ovs_tunnel_endpoints0id'), + ) ip_address = Column(String(64), primary_key=True) id = Column(Integer, nullable=False) diff --git a/neutron/tests/unit/openvswitch/test_ovs_db.py b/neutron/tests/unit/openvswitch/test_ovs_db.py index 8831cca072..ca8d4dc0d7 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_db.py +++ b/neutron/tests/unit/openvswitch/test_ovs_db.py @@ -13,12 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock import testtools from testtools import matchers from neutron.common import exceptions as q_exc from neutron.db import api as db +from neutron.openstack.common.db import exception as db_exc +from neutron.openstack.common.db.sqlalchemy import session from neutron.plugins.openvswitch import ovs_db_v2 +from neutron.plugins.openvswitch import ovs_models_v2 as ovs_models from neutron.tests import base from neutron.tests.unit import test_db_plugin as test_plugin @@ -262,6 +266,30 @@ class TunnelAllocationsTest(base.BaseTestCase): ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES) self.assertIsNone(ovs_db_v2.get_tunnel_allocation(tunnel_id)) + def test_add_tunnel_endpoint_create_new_endpoint(self): + addr = '10.0.0.1' + ovs_db_v2.add_tunnel_endpoint(addr) + self.assertIsNotNone(self.session.query(ovs_models.TunnelEndpoint). + filter_by(ip_address=addr).first()) + + def test_add_tunnel_endpoint_retrieve_an_existing_endpoint(self): + addr = '10.0.0.1' + self.session.add(ovs_models.TunnelEndpoint(ip_address=addr, id=1)) + self.session.flush() + + tunnel = ovs_db_v2.add_tunnel_endpoint(addr) + self.assertEquals(tunnel.id, 1) + self.assertEquals(tunnel.ip_address, addr) + + def test_add_tunnel_endpoint_handle_duplicate_error(self): + with mock.patch.object(session.Session, 'query') as query_mock: + error = db_exc.DBDuplicateEntry(['id']) + query_mock.side_effect = error + + with testtools.ExpectedException(q_exc.NeutronException): + ovs_db_v2.add_tunnel_endpoint('10.0.0.1', 5) + self.assertEquals(query_mock.call_count, 5) + class NetworkBindingsTest(test_plugin.NeutronDbPluginV2TestCase): def setUp(self):