Adding distributed locking to central

The current locking implementation is limited to
the running process. This introduces distributed
locking that will help prevent race conditions
when there are many instances of designate-central
running.

Closes-Bug: #1871332
Change-Id: I98f7f80ce365cdee33528f9964c03274f62a795a
Signed-off-by: Nicolas Bock <nicolas.bock@canonical.com>
(cherry picked from commit f6090d885c)
This commit is contained in:
Erik Olof Gunnar Andersson 2020-04-06 22:39:36 -07:00 committed by Nicolas Bock
parent 656a26c45c
commit acc82755f1
No known key found for this signature in database
GPG Key ID: 23EDF7B8E50200B5
5 changed files with 194 additions and 2 deletions

View File

@ -33,9 +33,9 @@ from dns import exception as dnsexception
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_log import log as logging
from oslo_concurrency import lockutils
from designate import context as dcontext
from designate import coordination
from designate import exceptions
from designate import dnsutils
from designate import network_api
@ -117,7 +117,7 @@ def synchronized_zone(zone_arg=1, new_zone=False):
if zone_id in ZONE_LOCKS.held:
return f(self, *args, **kwargs)
with lockutils.lock(lock_name):
with self.coordination.get_lock(lock_name):
try:
ZONE_LOCKS.held.add(zone_id)
return f(self, *args, **kwargs)
@ -191,6 +191,10 @@ class Service(service.RPCService, service.Service):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
self.coordination = coordination.Coordination(
self.service_name, self.tg
)
self.network_api = network_api.get_network_api(cfg.CONF.network_api)
# update_service_status needs is called by the emitter so we pass
@ -231,8 +235,10 @@ class Service(service.RPCService, service.Service):
"configured")
super(Service, self).start()
self.coordination.start()
def stop(self):
self.coordination.stop()
super(Service, self).stop()
@property

View File

@ -20,6 +20,7 @@ import math
import time
from oslo_config import cfg
from oslo_concurrency import lockutils
from oslo_log import log
import tenacity
import tooz.coordination
@ -59,6 +60,99 @@ def _retry_if_tooz_error(exception):
return isinstance(exception, tooz.coordination.ToozError)
class Coordination(object):
def __init__(self, name, tg, grouping_enabled=False):
# NOTE(eandersson): Workaround until tooz handles the conversion.
if not isinstance(name, bytes):
name = name.encode('ascii')
self.name = name
self.tg = tg
self.coordination_id = None
self._grouping_enabled = grouping_enabled
self._coordinator = None
self._started = False
@property
def coordinator(self):
return self._coordinator
@property
def started(self):
return self._started
def get_lock(self, name):
if self._coordinator:
# NOTE(eandersson): Workaround until tooz handles the conversion.
if not isinstance(name, bytes):
name = name.encode('ascii')
return self._coordinator.get_lock(name)
return lockutils.lock(name)
def start(self):
self.coordination_id = ":".join([CONF.host, generate_uuid()])
self._started = False
backend_url = CONF.coordination.backend_url
if backend_url is None:
LOG.warning('No coordination backend configured, distributed '
'coordination functionality will be disabled. '
'Please configure a coordination backend.')
return
self._coordinator = tooz.coordination.get_coordinator(
backend_url, self.coordination_id.encode()
)
while not self._coordinator.is_started:
self._coordinator.start(start_heart=True)
self._started = True
if self._grouping_enabled:
self._enable_grouping()
def stop(self):
if self._coordinator is None:
return
try:
if self._grouping_enabled:
self._disable_grouping()
self._coordinator.stop()
self._coordinator = None
finally:
self._started = False
def _coordinator_run_watchers(self):
if not self._started:
return
self._coordinator.run_watchers()
def _create_group(self):
try:
create_group_req = self._coordinator.create_group(
self.name
)
create_group_req.get()
except tooz.coordination.GroupAlreadyExist:
pass
join_group_req = self._coordinator.join_group(self.name)
join_group_req.get()
def _disable_grouping(self):
try:
leave_group_req = self._coordinator.leave_group(self.name)
leave_group_req.get()
except tooz.coordination.GroupNotCreated:
pass
def _enable_grouping(self):
self._create_group()
self.tg.add_timer(
CONF.coordination.run_watchers_interval,
self._coordinator_run_watchers
)
class CoordinationMixin(object):
def __init__(self, *args, **kwargs):
super(CoordinationMixin, self).__init__(*args, **kwargs)

View File

@ -0,0 +1,74 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_concurrency import lockutils
from oslo_log import log as logging
from designate import exceptions
from designate import utils
from designate.central import service
from designate.objects import record
from designate.objects import zone
from designate.tests.test_central import CentralTestCase
LOG = logging.getLogger(__name__)
class FakeCoordination(object):
def get_lock(self, name):
return lockutils.lock(name)
class CentralDecoratorTests(CentralTestCase):
def test_synchronized_zone_exception_raised(self):
@service.synchronized_zone()
def mock_get_zone(cls, index, zone):
self.assertEqual(service.ZONE_LOCKS.held, {zone.id})
if index % 3 == 0:
raise exceptions.ZoneNotFound()
for index in range(9):
try:
mock_get_zone(mock.Mock(coordination=FakeCoordination()),
index,
zone.Zone(id=utils.generate_uuid()))
except exceptions.ZoneNotFound:
pass
def test_synchronized_zone_recursive_decorator_call(self):
@service.synchronized_zone()
def mock_create_record(cls, context, record):
self.assertEqual(service.ZONE_LOCKS.held, {record.zone_id})
mock_get_zone(cls, context, zone.Zone(id=record.zone_id))
@service.synchronized_zone()
def mock_get_zone(cls, context, zone):
self.assertEqual(service.ZONE_LOCKS.held, {zone.id})
mock_create_record(mock.Mock(coordination=FakeCoordination()),
self.get_context(),
record=record.Record(zone_id=utils.generate_uuid()))
mock_get_zone(mock.Mock(coordination=FakeCoordination()),
self.get_context(),
zone=zone.Zone(id=utils.generate_uuid()))
def test_synchronized_zone_raises_exception_when_no_zone_provided(self):
@service.synchronized_zone(new_zone=False)
def mock_not_creating_new_zone(cls, context, record):
pass
self.assertRaisesRegexp(
Exception,
'Failed to determine zone id for '
'synchronized operation',
mock_not_creating_new_zone, self.get_context(), None
)

View File

@ -251,6 +251,11 @@ function install_designate {
git_clone $DESIGNATE_REPO $DESIGNATE_DIR $DESIGNATE_BRANCH
setup_develop $DESIGNATE_DIR
# Install reqs for tooz driver
if [[ "$DESIGNATE_COORDINATION_URL" =~ "memcached" ]]; then
pip_install_gr "pymemcache"
fi
install_designate_backend
}

View File

@ -0,0 +1,13 @@
---
fixes:
- |
Adding distributed locking to central
The central service was not using a global lock which can lead to
a race condition in a high availability setup leading to missing
recordsets in the DNS backend. This release includes a partial
backport of a distributed locking mechanism to central.
Fixes `bug 1871332`_
.. _Bug 1871332: https://bugs.launchpad.net/designate/+bug/1871332