Merge "Refactor retry mechanism used in some DB operations"

This commit is contained in:
Jenkins 2015-03-13 07:54:20 +00:00 committed by Gerrit Code Review
commit d6a12942f3
5 changed files with 56 additions and 46 deletions

View File

@ -18,6 +18,8 @@ from oslo_db.sqlalchemy import session
_FACADE = None _FACADE = None
MAX_RETRIES = 10
def _create_facade_lazily(): def _create_facade_lazily():
global _FACADE global _FACADE

View File

@ -17,14 +17,9 @@ from oslo_db import exception as db_exc
from oslo_log import log from oslo_log import log
from neutron.common import exceptions as exc from neutron.common import exceptions as exc
from neutron.i18n import _LW
from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2 import driver_api as api
# Number of attempts to find a valid segment candidate and allocate it
DB_MAX_ATTEMPTS = 10
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -108,37 +103,32 @@ class TypeDriverHelper(api.TypeDriver):
filter_by(allocated=False, **filters)) filter_by(allocated=False, **filters))
# Selected segment can be allocated before update by someone else, # Selected segment can be allocated before update by someone else,
# We retry until update success or DB_MAX_ATTEMPTS attempts alloc = select.first()
for attempt in range(1, DB_MAX_ATTEMPTS + 1):
alloc = select.first()
if not alloc: if not alloc:
# No resource available # No resource available
return return
raw_segment = dict((k, alloc[k]) for k in self.primary_keys) raw_segment = dict((k, alloc[k]) for k in self.primary_keys)
LOG.debug("%(type)s segment allocate from pool, attempt " LOG.debug("%(type)s segment allocate from pool "
"%(attempt)s started with %(segment)s ", "started with %(segment)s ",
{"type": network_type, "attempt": attempt, {"type": network_type,
"segment": raw_segment})
count = (session.query(self.model).
filter_by(allocated=False, **raw_segment).
update({"allocated": True}))
if count:
LOG.debug("%(type)s segment allocate from pool "
"success with %(segment)s ",
{"type": network_type,
"segment": raw_segment}) "segment": raw_segment})
count = (session.query(self.model). return alloc
filter_by(allocated=False, **raw_segment).
update({"allocated": True}))
if count:
LOG.debug("%(type)s segment allocate from pool, attempt "
"%(attempt)s success with %(segment)s ",
{"type": network_type, "attempt": attempt,
"segment": raw_segment})
return alloc
# Segment allocated since select # Segment allocated since select
LOG.debug("Allocate %(type)s segment from pool, " LOG.debug("Allocate %(type)s segment from pool "
"attempt %(attempt)s failed with segment " "failed with segment %(segment)s",
"%(segment)s", {"type": network_type,
{"type": network_type, "attempt": attempt, "segment": raw_segment})
"segment": raw_segment}) # saving real exception in case we exceeded amount of attempts
raise db_exc.RetryRequest(
LOG.warning(_LW("Allocate %(type)s segment from pool failed " exc.NoNetworkFoundInMaximumAllowedAttempts())
"after %(number)s failed attempts"),
{"type": network_type, "number": DB_MAX_ATTEMPTS})
raise exc.NoNetworkFoundInMaximumAllowedAttempts()

View File

@ -18,6 +18,7 @@ import contextlib
from eventlet import greenthread from eventlet import greenthread
from oslo_concurrency import lockutils from oslo_concurrency import lockutils
from oslo_config import cfg from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as os_db_exception from oslo_db import exception as os_db_exception
from oslo_log import log from oslo_log import log
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
@ -597,8 +598,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.mechanism_manager.create_network_precommit(mech_context) self.mechanism_manager.create_network_precommit(mech_context)
return result, mech_context return result, mech_context
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
retry_on_request=True)
def _create_network_with_retries(self, context, network):
return self._create_network_db(context, network)
def create_network(self, context, network): def create_network(self, context, network):
result, mech_context = self._create_network_db(context, network) result, mech_context = self._create_network_with_retries(context,
network)
try: try:
self.mechanism_manager.create_network_postcommit(mech_context) self.mechanism_manager.create_network_postcommit(mech_context)
except ml2_exc.MechanismDriverError: except ml2_exc.MechanismDriverError:

View File

@ -16,9 +16,9 @@
import fixtures import fixtures
import logging as std_logging import logging as std_logging
import mock import mock
from oslo_db import exception as exc
from sqlalchemy.orm import query from sqlalchemy.orm import query
from neutron.common import exceptions as exc
import neutron.db.api as db import neutron.db.api as db
from neutron.plugins.ml2.drivers import helpers from neutron.plugins.ml2.drivers import helpers
from neutron.plugins.ml2.drivers import type_vlan from neutron.plugins.ml2.drivers import type_vlan
@ -134,15 +134,10 @@ class HelpersTest(testlib_api.SqlTestCase):
def test_allocate_partial_segment_first_attempt_fails(self): def test_allocate_partial_segment_first_attempt_fails(self):
expected = dict(physical_network=TENANT_NET) expected = dict(physical_network=TENANT_NET)
with mock.patch.object(query.Query, 'update', side_effect=[0, 1]): with mock.patch.object(query.Query, 'update', side_effect=[0, 1]):
self.assertRaises(
exc.RetryRequest,
self.driver.allocate_partially_specified_segment,
self.session, **expected)
observed = self.driver.allocate_partially_specified_segment( observed = self.driver.allocate_partially_specified_segment(
self.session, **expected) self.session, **expected)
self.check_raw_segment(expected, observed) self.check_raw_segment(expected, observed)
def test_allocate_partial_segment_all_attempts_fail(self):
with mock.patch.object(query.Query, 'update', return_value=0):
with mock.patch.object(helpers.LOG, 'warning') as log_warning:
self.assertRaises(
exc.NoNetworkFoundInMaximumAllowedAttempts,
self.driver.allocate_partially_specified_segment,
self.session)
log_warning.assert_called_once_with(mock.ANY, mock.ANY)

View File

@ -21,10 +21,13 @@ import testtools
import uuid import uuid
import webob import webob
from oslo_db import exception as db_exc
from neutron.common import constants from neutron.common import constants
from neutron.common import exceptions as exc from neutron.common import exceptions as exc
from neutron.common import utils from neutron.common import utils
from neutron import context from neutron import context
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2 as base_plugin from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.db import l3_db from neutron.db import l3_db
from neutron.extensions import external_net as external_net from neutron.extensions import external_net as external_net
@ -241,6 +244,19 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
for expected, actual in zip(expected_segments, segments): for expected, actual in zip(expected_segments, segments):
self.assertEqual(expected, actual) self.assertEqual(expected, actual)
def test_create_network_segment_allocation_fails(self):
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(plugin.type_manager, 'create_network_segments',
side_effect=db_exc.RetryRequest(ValueError())) as f:
self.assertRaises(ValueError,
plugin.create_network,
context.get_admin_context(),
{'network': {'tenant_id': 'sometenant',
'name': 'dummy',
'admin_state_up': True,
'shared': False}})
self.assertEqual(db_api.MAX_RETRIES + 1, f.call_count)
class TestMl2SubnetsV2(test_plugin.TestSubnetsV2, class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
Ml2PluginV2TestCase): Ml2PluginV2TestCase):