Remove etcd3 drvier
This driver was deprecated in the 2.11.0 release[1]. The 3.2.0 release
which contains this deprecation is used in upper-constraints of
stable/2023.1 so we are ready to remove this feature now.
[1] 7ee2780af8
Change-Id: I5ca2fe43cb25b4687ace6bd21a866b74f289d628
This commit is contained in:
parent
7007c774da
commit
6bc02cda5b
@ -7,10 +7,6 @@ reno>=3.1.0 # Apache-2.0
|
||||
python-consul2>=0.0.16 # MIT License
|
||||
## etcd
|
||||
requests>=2.10.0 # Apache-2.0
|
||||
## etcd3
|
||||
etcd3>=0.12.0 # Apache-2.0
|
||||
grpcio>=1.18.0
|
||||
protobuf # BSD License (3 clause)
|
||||
## etcd3gw
|
||||
etcd3gw!=0.2.6,>=0.1.0 # Apache-2.0
|
||||
## zake
|
||||
|
@ -20,12 +20,6 @@ Etcd
|
||||
.. autoclass:: tooz.drivers.etcd.EtcdDriver
|
||||
:members:
|
||||
|
||||
Etcd3
|
||||
~~~~~
|
||||
|
||||
.. autoclass:: tooz.drivers.etcd3.Etcd3Driver
|
||||
:members:
|
||||
|
||||
Etcd3gw
|
||||
~~~~~~~
|
||||
|
||||
|
@ -198,21 +198,6 @@ The etcd driver is a driver providing only distributed locks (for now)
|
||||
and is based on the `etcd server`_ supported key/value storage and
|
||||
associated primitives.
|
||||
|
||||
Etcd3
|
||||
-----
|
||||
|
||||
**Driver:** :py:class:`tooz.drivers.etcd3.Etcd3Driver`
|
||||
|
||||
**Characteristics:** :py:attr:`tooz.drivers.etcd3.Etcd3Driver.CHARACTERISTICS`
|
||||
|
||||
**Entrypoint name:** ``etcd3``
|
||||
|
||||
**Summary:**
|
||||
|
||||
The etcd3 driver is a driver providing only distributed locks (for now)
|
||||
and is based on the `etcd server`_ supported key/value storage and
|
||||
associated primitives.
|
||||
|
||||
Etcd3 Gateway
|
||||
-------------
|
||||
|
||||
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
The etcd3 driver has been removed.
|
@ -29,7 +29,6 @@ packages =
|
||||
[entry_points]
|
||||
tooz.backends =
|
||||
etcd = tooz.drivers.etcd:EtcdDriver
|
||||
etcd3 = tooz.drivers.etcd3:Etcd3Driver
|
||||
etcd3+http = tooz.drivers.etcd3gw:Etcd3Driver
|
||||
etcd3+https = tooz.drivers.etcd3gw:Etcd3Driver
|
||||
kazoo = tooz.drivers.zookeeper:KazooDriver
|
||||
@ -48,10 +47,6 @@ consul =
|
||||
python-consul2>=0.0.16 # MIT License
|
||||
etcd =
|
||||
requests>=2.10.0 # Apache-2.0
|
||||
etcd3 =
|
||||
etcd3>=0.12.0 # Apache-2.0
|
||||
grpcio>=1.18.0
|
||||
protobuf<4.0 # BSD License (3 clause)
|
||||
etcd3gw =
|
||||
etcd3gw!=0.2.6,>=0.1.0 # Apache-2.0
|
||||
zake =
|
||||
|
@ -1,372 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import contextlib
|
||||
import functools
|
||||
import threading
|
||||
|
||||
import etcd3
|
||||
from etcd3 import exceptions as etcd3_exc
|
||||
from oslo_utils import encodeutils
|
||||
|
||||
import tooz
|
||||
from tooz import _retry
|
||||
from tooz import coordination
|
||||
from tooz import locking
|
||||
from tooz import utils
|
||||
|
||||
import warnings
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _failure_translator():
|
||||
"""Translates common requests exceptions into tooz exceptions."""
|
||||
try:
|
||||
yield
|
||||
except etcd3_exc.ConnectionFailedError as e:
|
||||
utils.raise_with_cause(coordination.ToozConnectionError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
except etcd3_exc.ConnectionTimeoutError as e:
|
||||
utils.raise_with_cause(coordination.OperationTimedOut,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
except etcd3_exc.Etcd3Exception as e:
|
||||
utils.raise_with_cause(coordination.ToozError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
|
||||
|
||||
def _translate_failures(func):
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
with _failure_translator():
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class Etcd3Lock(locking.Lock):
|
||||
"""An etcd3-specific lock.
|
||||
|
||||
Thin wrapper over etcd3's lock object basically to provide the heartbeat()
|
||||
semantics for the coordination driver.
|
||||
"""
|
||||
|
||||
LOCK_PREFIX = b"/tooz/locks"
|
||||
|
||||
def __init__(self, coord, name, timeout):
|
||||
super(Etcd3Lock, self).__init__(name)
|
||||
self._coord = coord
|
||||
self._lock = coord.client.lock(name.decode(), timeout)
|
||||
self._exclusive_access = threading.Lock()
|
||||
|
||||
@_translate_failures
|
||||
def acquire(self, blocking=True, shared=False):
|
||||
if shared:
|
||||
raise tooz.NotImplemented
|
||||
|
||||
blocking, timeout = utils.convert_blocking(blocking)
|
||||
if blocking is False:
|
||||
timeout = 0
|
||||
|
||||
if self._lock.acquire(timeout):
|
||||
self._coord._acquired_locks.add(self)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@property
|
||||
def acquired(self):
|
||||
return self in self._coord._acquired_locks
|
||||
|
||||
@_translate_failures
|
||||
def release(self):
|
||||
with self._exclusive_access:
|
||||
if self.acquired and self._lock.release():
|
||||
self._coord._acquired_locks.discard(self)
|
||||
return True
|
||||
return False
|
||||
|
||||
@_translate_failures
|
||||
def heartbeat(self):
|
||||
with self._exclusive_access:
|
||||
if self.acquired:
|
||||
self._lock.refresh()
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class Etcd3Driver(coordination.CoordinationDriverCachedRunWatchers,
|
||||
coordination.CoordinationDriverWithExecutor):
|
||||
"""An etcd based driver.
|
||||
|
||||
This driver uses etcd provide the coordination driver semantics and
|
||||
required API(s).
|
||||
|
||||
The Etcd driver connection URI should look like::
|
||||
|
||||
etcd3://[HOST[:PORT]][?OPTION1=VALUE1[&OPTION2=VALUE2[&...]]]
|
||||
|
||||
If not specified, HOST defaults to localhost and PORT defaults to 2379.
|
||||
Available options are:
|
||||
|
||||
================== =======
|
||||
Name Default
|
||||
================== =======
|
||||
ca_cert None
|
||||
cert_key None
|
||||
cert_cert None
|
||||
timeout 30
|
||||
lock_timeout 30
|
||||
membership_timeout 30
|
||||
================== =======
|
||||
"""
|
||||
|
||||
#: Default socket/lock/member/leader timeout used when none is provided.
|
||||
DEFAULT_TIMEOUT = 30
|
||||
|
||||
#: Default hostname used when none is provided.
|
||||
DEFAULT_HOST = "localhost"
|
||||
|
||||
#: Default port used if none provided (4001 or 2379 are the common ones).
|
||||
DEFAULT_PORT = 2379
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
super(Etcd3Driver, self).__init__(member_id, parsed_url, options)
|
||||
warnings.warn(
|
||||
"The etcd3 tooz driver is deprecated, it will be removed in"
|
||||
"a future release."
|
||||
)
|
||||
host = parsed_url.hostname or self.DEFAULT_HOST
|
||||
port = parsed_url.port or self.DEFAULT_PORT
|
||||
options = utils.collapse(options)
|
||||
ca_cert = options.get('ca_cert')
|
||||
cert_key = options.get('cert_key')
|
||||
cert_cert = options.get('cert_cert')
|
||||
timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT))
|
||||
self.client = etcd3.client(host=host,
|
||||
port=port,
|
||||
ca_cert=ca_cert,
|
||||
cert_key=cert_key,
|
||||
cert_cert=cert_cert,
|
||||
timeout=timeout)
|
||||
self.lock_timeout = int(options.get('lock_timeout', timeout))
|
||||
self.membership_timeout = int(options.get(
|
||||
'membership_timeout', timeout))
|
||||
self._acquired_locks = set()
|
||||
|
||||
def _start(self):
|
||||
super(Etcd3Driver, self)._start()
|
||||
self._membership_lease = self.client.lease(self.membership_timeout)
|
||||
|
||||
def _stop(self):
|
||||
super(Etcd3Driver, self)._stop()
|
||||
self._membership_lease.revoke()
|
||||
|
||||
def get_lock(self, name):
|
||||
return Etcd3Lock(self, name, self.lock_timeout)
|
||||
|
||||
def heartbeat(self):
|
||||
# NOTE(jaypipes): Copying because set can mutate during iteration
|
||||
for lock in self._acquired_locks.copy():
|
||||
lock.heartbeat()
|
||||
# TODO(jd) use the same lease for locks?
|
||||
self._membership_lease.refresh()
|
||||
return min(self.lock_timeout, self.membership_timeout)
|
||||
|
||||
GROUP_PREFIX = b"tooz/groups/"
|
||||
|
||||
def _encode_group_id(self, group_id):
|
||||
return self.GROUP_PREFIX + utils.to_binary(group_id) + b"/"
|
||||
|
||||
def _encode_group_member_id(self, group_id, member_id):
|
||||
return self._encode_group_id(group_id) + utils.to_binary(member_id)
|
||||
|
||||
def create_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
||||
@_translate_failures
|
||||
def _create_group():
|
||||
status, results = self.client.transaction(
|
||||
compare=[
|
||||
self.client.transactions.version(
|
||||
encoded_group) == 0
|
||||
],
|
||||
success=[
|
||||
self.client.transactions.put(encoded_group, b"")
|
||||
],
|
||||
failure=[],
|
||||
)
|
||||
if not status:
|
||||
raise coordination.GroupAlreadyExist(group_id)
|
||||
|
||||
return EtcdFutureResult(self._executor.submit(_create_group))
|
||||
|
||||
def _destroy_group(self, group_id):
|
||||
self.client.delete(self._encode_group_id(group_id))
|
||||
|
||||
def delete_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
||||
@_translate_failures
|
||||
def _delete_group():
|
||||
members = list(self.client.get_prefix(encoded_group))
|
||||
if len(members) > 1:
|
||||
raise coordination.GroupNotEmpty(group_id)
|
||||
|
||||
# Warning: as of this writing python-etcd3 does not support the
|
||||
# NOT_EQUAL operator so we use the EQUAL operator and will retry on
|
||||
# success, hihi
|
||||
status, results = self.client.transaction(
|
||||
compare=[
|
||||
self.client.transactions.version(encoded_group) == 0
|
||||
],
|
||||
success=[],
|
||||
failure=[
|
||||
self.client.transactions.delete(encoded_group)
|
||||
],
|
||||
)
|
||||
if status:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
|
||||
return EtcdFutureResult(self._executor.submit(_delete_group))
|
||||
|
||||
def join_group(self, group_id, capabilities=b""):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
||||
@_retry.retry()
|
||||
@_translate_failures
|
||||
def _join_group():
|
||||
members = list(self.client.get_prefix(encoded_group))
|
||||
|
||||
encoded_member = self._encode_group_member_id(
|
||||
group_id, self._member_id)
|
||||
|
||||
group_metadata = None
|
||||
for cap, metadata in members:
|
||||
if metadata.key == encoded_member:
|
||||
raise coordination.MemberAlreadyExist(group_id,
|
||||
self._member_id)
|
||||
if metadata.key == encoded_group:
|
||||
group_metadata = metadata
|
||||
|
||||
if group_metadata is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
|
||||
status, results = self.client.transaction(
|
||||
# This comparison makes sure the group has not been deleted in
|
||||
# the mean time
|
||||
compare=[
|
||||
self.client.transactions.version(encoded_group) ==
|
||||
group_metadata.version
|
||||
],
|
||||
success=[
|
||||
self.client.transactions.put(encoded_member,
|
||||
utils.dumps(capabilities),
|
||||
lease=self._membership_lease)
|
||||
],
|
||||
failure=[],
|
||||
)
|
||||
if not status:
|
||||
# TODO(jd) There's a small optimization doable by getting the
|
||||
# current range on failure and passing it to this function as
|
||||
# the first arg when retrying to avoid redoing a get_prefix()
|
||||
raise _retry.TryAgain
|
||||
|
||||
return EtcdFutureResult(self._executor.submit(_join_group))
|
||||
|
||||
def leave_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
||||
@_translate_failures
|
||||
def _leave_group():
|
||||
members = list(self.client.get_prefix(encoded_group))
|
||||
|
||||
encoded_member = self._encode_group_member_id(
|
||||
group_id, self._member_id)
|
||||
|
||||
for capabilities, metadata in members:
|
||||
if metadata.key == encoded_member:
|
||||
break
|
||||
else:
|
||||
raise coordination.MemberNotJoined(group_id,
|
||||
self._member_id)
|
||||
|
||||
self.client.delete(encoded_member)
|
||||
|
||||
return EtcdFutureResult(self._executor.submit(_leave_group))
|
||||
|
||||
def get_members(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
||||
@_translate_failures
|
||||
def _get_members():
|
||||
members = set()
|
||||
group_found = False
|
||||
|
||||
for cap, metadata in self.client.get_prefix(encoded_group):
|
||||
if metadata.key == encoded_group:
|
||||
group_found = True
|
||||
else:
|
||||
members.add(metadata.key[len(encoded_group):])
|
||||
|
||||
if not group_found:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
|
||||
return members
|
||||
|
||||
return EtcdFutureResult(self._executor.submit(_get_members))
|
||||
|
||||
def get_member_capabilities(self, group_id, member_id):
|
||||
encoded_member = self._encode_group_member_id(
|
||||
group_id, member_id)
|
||||
|
||||
@_translate_failures
|
||||
def _get_member_capabilities():
|
||||
capabilities, metadata = self.client.get(encoded_member)
|
||||
if capabilities is None:
|
||||
raise coordination.MemberNotJoined(group_id, member_id)
|
||||
return utils.loads(capabilities)
|
||||
|
||||
return EtcdFutureResult(
|
||||
self._executor.submit(_get_member_capabilities))
|
||||
|
||||
def update_capabilities(self, group_id, capabilities):
|
||||
encoded_member = self._encode_group_member_id(
|
||||
group_id, self._member_id)
|
||||
|
||||
@_translate_failures
|
||||
def _update_capabilities():
|
||||
cap, metadata = self.client.get(encoded_member)
|
||||
if cap is None:
|
||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||
|
||||
self.client.put(encoded_member, utils.dumps(capabilities),
|
||||
lease=self._membership_lease)
|
||||
|
||||
return EtcdFutureResult(
|
||||
self._executor.submit(_update_capabilities))
|
||||
|
||||
@staticmethod
|
||||
def watch_elected_as_leader(group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
@staticmethod
|
||||
def unwatch_elected_as_leader(group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
|
||||
EtcdFutureResult = functools.partial(coordination.CoordinatorResult,
|
||||
failure_translator=_failure_translator)
|
@ -55,8 +55,6 @@ class TestWithCoordinator(testcase.TestCase, metaclass=SkipNotImplementedMeta):
|
||||
super(TestWithCoordinator, self).setUp()
|
||||
if self.url is None:
|
||||
raise RuntimeError("No URL set for this driver")
|
||||
if os.getenv("TOOZ_TEST_ETCD3"):
|
||||
self.url = self.url.replace("etcd://", "etcd3://")
|
||||
if os.getenv("TOOZ_TEST_ETCD3GW"):
|
||||
# TODO(jan.gutter): When pifpaf supports etcd 3.4 we should use the
|
||||
# defaults
|
||||
|
@ -1,63 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2020 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ddt
|
||||
from testtools import testcase
|
||||
from unittest import mock
|
||||
|
||||
import tooz.coordination
|
||||
import tooz.drivers.etcd3 as etcd3_driver
|
||||
import tooz.tests
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestEtcd3(testcase.TestCase):
|
||||
FAKE_MEMBER_ID = tooz.tests.get_random_uuid()
|
||||
|
||||
@ddt.data({'coord_url': 'etcd3://',
|
||||
'host': etcd3_driver.Etcd3Driver.DEFAULT_HOST,
|
||||
'port': etcd3_driver.Etcd3Driver.DEFAULT_PORT,
|
||||
'ca_cert': None,
|
||||
'cert_key': None,
|
||||
'cert_cert': None,
|
||||
'timeout': etcd3_driver.Etcd3Driver.DEFAULT_TIMEOUT},
|
||||
{'coord_url': ('etcd3://my_host:666?ca_cert=/my/ca_cert&'
|
||||
'cert_key=/my/cert_key&cert_cert=/my/cert_cert&'
|
||||
'timeout=42'),
|
||||
'host': 'my_host',
|
||||
'port': 666,
|
||||
'ca_cert': '/my/ca_cert',
|
||||
'cert_key': '/my/cert_key',
|
||||
'cert_cert': '/my/cert_cert',
|
||||
'timeout': 42})
|
||||
@ddt.unpack
|
||||
@mock.patch('etcd3.client')
|
||||
def test_etcd3_client_init(self,
|
||||
mock_etcd3_client,
|
||||
coord_url,
|
||||
host,
|
||||
port,
|
||||
ca_cert,
|
||||
cert_key,
|
||||
cert_cert,
|
||||
timeout):
|
||||
tooz.coordination.get_coordinator(coord_url, self.FAKE_MEMBER_ID)
|
||||
mock_etcd3_client.assert_called_with(host=host,
|
||||
port=port,
|
||||
ca_cert=ca_cert,
|
||||
cert_key=cert_key,
|
||||
cert_cert=cert_cert,
|
||||
timeout=timeout)
|
@ -818,22 +818,10 @@ class TestAPI(tests.TestWithCoordinator):
|
||||
self.assertTrue(f.result())
|
||||
|
||||
def test_get_lock_concurrency_locking_two_lock_process(self):
|
||||
# NOTE(jd) Using gRPC and forking is not supported so this test might
|
||||
# very likely hang forever or crash. See
|
||||
# https://github.com/grpc/grpc/issues/10140#issuecomment-297548714 for
|
||||
# more info.
|
||||
if self.url.startswith("etcd3://"):
|
||||
self.skipTest("Unable to use etcd3 with fork()")
|
||||
self._do_test_get_lock_concurrency_locking_two_lock(
|
||||
futures.ProcessPoolExecutor, False)
|
||||
|
||||
def test_get_lock_serial_locking_two_lock_process(self):
|
||||
# NOTE(jd) Using gRPC and forking is not supported so this test might
|
||||
# very likely hang forever or crash. See
|
||||
# https://github.com/grpc/grpc/issues/10140#issuecomment-297548714 for
|
||||
# more info.
|
||||
if self.url.startswith("etcd3://"):
|
||||
self.skipTest("Unable to use etcd3 with fork()")
|
||||
self._do_test_get_lock_serial_locking_two_lock(
|
||||
futures.ProcessPoolExecutor, False)
|
||||
|
||||
|
7
tox.ini
7
tox.ini
@ -1,13 +1,13 @@
|
||||
[tox]
|
||||
minversion = 3.1.0
|
||||
envlist = py3,py{36,38}-{zookeeper,redis,sentinel,memcached,postgresql,mysql,consul,etcd,etcd3,etcd3gw},pep8
|
||||
envlist = py3,py{36,38}-{zookeeper,redis,sentinel,memcached,postgresql,mysql,consul,etcd,etcd3gw},pep8
|
||||
ignore_basepython_conflict = True
|
||||
|
||||
[testenv]
|
||||
basepython = python3
|
||||
# We need to install a bit more than just `test-requirements' because those drivers have
|
||||
# custom tests that we always run
|
||||
deps = .[zake,ipc,memcached,mysql,etcd,etcd3,etcd3gw]
|
||||
deps = .[zake,ipc,memcached,mysql,etcd,etcd3gw]
|
||||
zookeeper: .[zookeeper]
|
||||
redis: .[redis]
|
||||
sentinel: .[redis]
|
||||
@ -15,7 +15,6 @@ deps = .[zake,ipc,memcached,mysql,etcd,etcd3,etcd3gw]
|
||||
postgresql: .[postgresql]
|
||||
mysql: .[mysql]
|
||||
etcd: .[etcd]
|
||||
etcd3: .[etcd3]
|
||||
etcd3gw: .[etcd3gw]
|
||||
consul: .[consul]
|
||||
-r{toxinidir}/test-requirements.txt
|
||||
@ -28,8 +27,6 @@ setenv =
|
||||
mysql: TOOZ_TEST_DRIVERS = mysql
|
||||
postgresql: TOOZ_TEST_DRIVERS = postgresql
|
||||
etcd: TOOZ_TEST_DRIVERS = etcd,etcd --cluster
|
||||
etcd3: TOOZ_TEST_DRIVERS = etcd
|
||||
etcd3: TOOZ_TEST_ETCD3 = 1
|
||||
etcd3gw: TOOZ_TEST_DRIVERS = etcd
|
||||
etcd3gw: TOOZ_TEST_ETCD3GW = 1
|
||||
consul: TOOZ_TEST_DRIVERS = consul
|
||||
|
Loading…
Reference in New Issue
Block a user