Adding distributed locking to central
The current locking implementation is limited to the running process. This introduces distributed locking that will help prevent race conditions when there are many instances of designate-central running. Closes-Bug: #1871332 Change-Id: I98f7f80ce365cdee33528f9964c03274f62a795a
This commit is contained in:
parent
19ec7d9cd1
commit
f6090d885c
|
@ -33,9 +33,9 @@ from dns import exception as dnsexception
|
|||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
from oslo_log import log as logging
|
||||
from oslo_concurrency import lockutils
|
||||
|
||||
from designate import context as dcontext
|
||||
from designate import coordination
|
||||
from designate import exceptions
|
||||
from designate import dnsutils
|
||||
from designate import network_api
|
||||
|
@ -117,7 +117,7 @@ def synchronized_zone(zone_arg=1, new_zone=False):
|
|||
if zone_id in ZONE_LOCKS.held:
|
||||
return f(self, *args, **kwargs)
|
||||
|
||||
with lockutils.lock(lock_name):
|
||||
with self.coordination.get_lock(lock_name):
|
||||
try:
|
||||
ZONE_LOCKS.held.add(zone_id)
|
||||
return f(self, *args, **kwargs)
|
||||
|
@ -198,6 +198,10 @@ class Service(service.RPCService):
|
|||
threads=cfg.CONF['service:central'].threads,
|
||||
)
|
||||
|
||||
self.coordination = coordination.Coordination(
|
||||
self.service_name, self.tg
|
||||
)
|
||||
|
||||
self.network_api = network_api.get_network_api(cfg.CONF.network_api)
|
||||
|
||||
@property
|
||||
|
@ -233,8 +237,10 @@ class Service(service.RPCService):
|
|||
"configured")
|
||||
|
||||
super(Service, self).start()
|
||||
self.coordination.start()
|
||||
|
||||
def stop(self, graceful=True):
|
||||
self.coordination.stop()
|
||||
super(Service, self).stop(graceful)
|
||||
|
||||
@property
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
import math
|
||||
import time
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log
|
||||
import tenacity
|
||||
import tooz.coordination
|
||||
|
@ -51,6 +52,14 @@ class Coordination(object):
|
|||
def started(self):
|
||||
return self._started
|
||||
|
||||
def get_lock(self, name):
|
||||
if self._coordinator:
|
||||
# NOTE(eandersson): Workaround until tooz handles the conversion.
|
||||
if not isinstance(name, bytes):
|
||||
name = name.encode('ascii')
|
||||
return self._coordinator.get_lock(name)
|
||||
return lockutils.lock(name)
|
||||
|
||||
def start(self):
|
||||
self.coordination_id = ":".join([CONF.host, generate_uuid()])
|
||||
|
||||
|
|
|
@ -9,6 +9,8 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import mock
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log as logging
|
||||
|
||||
from designate import exceptions
|
||||
|
@ -21,6 +23,11 @@ from designate.tests.test_central import CentralTestCase
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FakeCoordination(object):
|
||||
def get_lock(self, name):
|
||||
return lockutils.lock(name)
|
||||
|
||||
|
||||
class CentralDecoratorTests(CentralTestCase):
|
||||
def test_synchronized_zone_exception_raised(self):
|
||||
@service.synchronized_zone()
|
||||
|
@ -31,7 +38,8 @@ class CentralDecoratorTests(CentralTestCase):
|
|||
|
||||
for index in range(9):
|
||||
try:
|
||||
mock_get_zone(object, index,
|
||||
mock_get_zone(mock.Mock(coordination=FakeCoordination()),
|
||||
index,
|
||||
zone.Zone(id=utils.generate_uuid()))
|
||||
except exceptions.ZoneNotFound:
|
||||
pass
|
||||
|
@ -46,9 +54,11 @@ class CentralDecoratorTests(CentralTestCase):
|
|||
def mock_get_zone(cls, context, zone):
|
||||
self.assertEqual(service.ZONE_LOCKS.held, {zone.id})
|
||||
|
||||
mock_create_record(object, self.get_context(),
|
||||
mock_create_record(mock.Mock(coordination=FakeCoordination()),
|
||||
self.get_context(),
|
||||
record=record.Record(zone_id=utils.generate_uuid()))
|
||||
mock_get_zone(object, self.get_context(),
|
||||
mock_get_zone(mock.Mock(coordination=FakeCoordination()),
|
||||
self.get_context(),
|
||||
zone=zone.Zone(id=utils.generate_uuid()))
|
||||
|
||||
def test_synchronized_zone_raises_exception_when_no_zone_provided(self):
|
||||
|
|
|
@ -256,6 +256,11 @@ function install_designate {
|
|||
git_clone $DESIGNATE_REPO $DESIGNATE_DIR $DESIGNATE_BRANCH
|
||||
setup_develop $DESIGNATE_DIR
|
||||
|
||||
# Install reqs for tooz driver
|
||||
if [[ "$DESIGNATE_COORDINATION_URL" =~ "memcached" ]]; then
|
||||
pip_install_gr "pymemcache"
|
||||
fi
|
||||
|
||||
install_designate_backend
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue