Separate etcd3gw driver that uses the etcd3 grpc gateway
Using grpc to directly access etcd3 does not work well with eventlet based services. So we need to use the grpc HTTP gateway API (/v3alpha). https://coreos.com/etcd/docs/latest/dev-guide/api_grpc_gateway.html For this we use the etcd3gw package: https://pypi.python.org/pypi/etcd3gw The structure of the code is very similar to the one we already have that uses the etcd3 package. Change-Id: I97bd7ffb05a7e40cb08c9b9d62cc45236ad292aa
This commit is contained in:
parent
24101cf264
commit
6ec73a0bed
@ -27,6 +27,7 @@ packages =
|
||||
tooz.backends =
|
||||
etcd = tooz.drivers.etcd:EtcdDriver
|
||||
etcd3 = tooz.drivers.etcd3:Etcd3Driver
|
||||
etcd3+http = tooz.drivers.etcd3gw:Etcd3Driver
|
||||
kazoo = tooz.drivers.zookeeper:KazooDriver
|
||||
zake = tooz.drivers.zake:ZakeDriver
|
||||
memcached = tooz.drivers.memcached:MemcachedDriver
|
||||
@ -45,6 +46,8 @@ etcd =
|
||||
requests>=2.10.0 # Apache-2.0
|
||||
etcd3 =
|
||||
etcd3>=0.5.1 # Apache-2.0
|
||||
etcd3gw =
|
||||
etcd3gw>=0.1.0 # Apache-2.0
|
||||
zake =
|
||||
zake>=0.1.6 # Apache-2.0
|
||||
redis =
|
||||
|
193
tooz/drivers/etcd3gw.py
Normal file
193
tooz/drivers/etcd3gw.py
Normal file
@ -0,0 +1,193 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from __future__ import absolute_import
|
||||
import base64
|
||||
import uuid
|
||||
|
||||
import etcd3gw
|
||||
from etcd3gw import exceptions as etcd3_exc
|
||||
from oslo_utils import encodeutils
|
||||
import six
|
||||
|
||||
import tooz
|
||||
from tooz import _retry
|
||||
from tooz import coordination
|
||||
from tooz import locking
|
||||
from tooz import utils
|
||||
|
||||
|
||||
def _translate_failures(func):
|
||||
"""Translates common requests exceptions into tooz exceptions."""
|
||||
|
||||
@six.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except etcd3_exc.ConnectionFailedError as e:
|
||||
utils.raise_with_cause(coordination.ToozConnectionError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
except etcd3_exc.ConnectionTimeoutError as e:
|
||||
utils.raise_with_cause(coordination.OperationTimedOut,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
except etcd3_exc.Etcd3Exception as e:
|
||||
utils.raise_with_cause(coordination.ToozError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class Etcd3Lock(locking.Lock):
|
||||
"""An etcd3-specific lock.
|
||||
|
||||
Thin wrapper over etcd3's lock object basically to provide the heartbeat()
|
||||
semantics for the coordination driver.
|
||||
"""
|
||||
|
||||
LOCK_PREFIX = b"/tooz/locks"
|
||||
|
||||
def __init__(self, coord, name, timeout):
|
||||
super(Etcd3Lock, self).__init__(name)
|
||||
self._timeout = timeout
|
||||
self._coord = coord
|
||||
self._key = self.LOCK_PREFIX + name
|
||||
self._key_b64 = base64.b64encode(self._key).decode("ascii")
|
||||
self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii")
|
||||
self._lease = self._coord.client.lease(self._timeout)
|
||||
|
||||
@_translate_failures
|
||||
def acquire(self, blocking=True, shared=False):
|
||||
if shared:
|
||||
raise tooz.NotImplemented
|
||||
|
||||
@_retry.retry(stop_max_delay=blocking)
|
||||
def _acquire():
|
||||
# TODO(jd): save the created revision so we can check it later to
|
||||
# make sure we still have the lock
|
||||
txn = {
|
||||
'compare': [{
|
||||
'key': self._key_b64,
|
||||
'result': 'EQUAL',
|
||||
'target': 'CREATE',
|
||||
'create_revision': 0
|
||||
}],
|
||||
'success': [{
|
||||
'request_put': {
|
||||
'key': self._key_b64,
|
||||
'value': self._uuid,
|
||||
'lease': self._lease.id
|
||||
}
|
||||
}],
|
||||
'failure': [{
|
||||
'request_range': {
|
||||
'key': self._key_b64
|
||||
}
|
||||
}]
|
||||
}
|
||||
result = self._coord.client.transaction(txn)
|
||||
success = result.get('succeeded', False)
|
||||
|
||||
if success is not True:
|
||||
if blocking is False:
|
||||
return False
|
||||
raise _retry.TryAgain
|
||||
self._coord._acquired_locks.add(self)
|
||||
return True
|
||||
|
||||
return _acquire()
|
||||
|
||||
@_translate_failures
|
||||
def release(self):
|
||||
txn = {
|
||||
'compare': [{
|
||||
'key': self._key_b64,
|
||||
'result': 'EQUAL',
|
||||
'target': 'VALUE',
|
||||
'value': self._uuid
|
||||
}],
|
||||
'success': [{
|
||||
'request_delete_range': {
|
||||
'key': self._key_b64
|
||||
}
|
||||
}]
|
||||
}
|
||||
|
||||
result = self._coord.client.transaction(txn)
|
||||
success = result.get('succeeded', False)
|
||||
if success:
|
||||
self._coord._acquired_locks.remove(self)
|
||||
return True
|
||||
return False
|
||||
|
||||
@_translate_failures
|
||||
def break_(self):
|
||||
if self._coord.client.delete(self._key):
|
||||
self._coord._acquired_locks.discard(self)
|
||||
return True
|
||||
return False
|
||||
|
||||
@_translate_failures
|
||||
def heartbeat(self):
|
||||
self._lease.refresh()
|
||||
|
||||
|
||||
class Etcd3Driver(coordination.CoordinationDriver):
|
||||
"""An etcd based driver.
|
||||
|
||||
This driver uses etcd provide the coordination driver semantics and
|
||||
required API(s).
|
||||
"""
|
||||
|
||||
#: Default socket/lock/member/leader timeout used when none is provided.
|
||||
DEFAULT_TIMEOUT = 30
|
||||
|
||||
#: Default hostname used when none is provided.
|
||||
DEFAULT_HOST = "localhost"
|
||||
|
||||
#: Default port used if none provided (4001 or 2379 are the common ones).
|
||||
DEFAULT_PORT = 2379
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
super(Etcd3Driver, self).__init__(member_id)
|
||||
host = parsed_url.hostname or self.DEFAULT_HOST
|
||||
port = parsed_url.port or self.DEFAULT_PORT
|
||||
options = utils.collapse(options)
|
||||
timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT))
|
||||
self.client = etcd3gw.client(host=host, port=port, timeout=timeout)
|
||||
self.lock_timeout = int(options.get('lock_timeout', timeout))
|
||||
self._acquired_locks = set()
|
||||
|
||||
def get_lock(self, name):
|
||||
return Etcd3Lock(self, name, self.lock_timeout)
|
||||
|
||||
def heartbeat(self):
|
||||
# NOTE(jaypipes): Copying because set can mutate during iteration
|
||||
for lock in self._acquired_locks.copy():
|
||||
lock.heartbeat()
|
||||
return self.lock_timeout
|
||||
|
||||
def watch_join_group(self, group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
def unwatch_join_group(self, group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
def watch_leave_group(self, group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
def unwatch_leave_group(self, group_id, callback):
|
||||
raise tooz.NotImplemented
|
@ -59,6 +59,8 @@ class TestWithCoordinator(testcase.TestCase):
|
||||
raise RuntimeError("No URL set for this driver")
|
||||
if os.getenv("TOOZ_TEST_ETCD3"):
|
||||
self.url = self.url.replace("etcd://", "etcd3://")
|
||||
if os.getenv("TOOZ_TEST_ETCD3GW"):
|
||||
self.url = self.url.replace("etcd://", "etcd3+http://")
|
||||
self.useFixture(fixtures.NestedTempfile())
|
||||
self.group_id = get_random_uuid()
|
||||
self.member_id = get_random_uuid()
|
||||
|
15
tox.ini
15
tox.ini
@ -1,11 +1,11 @@
|
||||
[tox]
|
||||
minversion = 1.8
|
||||
envlist = py27,py35,py27-zookeeper,py35-zookeeper,py27-redis,py35-redis,py27-sentinel,py35-sentinel,py27-memcached,py35-memcached,py27-postgresql,py35-postgresql,py27-mysql,py35-mysql,py27-consul,py35-consul,py27-etcd3,py35-etcd3,pep8
|
||||
envlist = py27,py35,py27-zookeeper,py35-zookeeper,py27-redis,py35-redis,py27-sentinel,py35-sentinel,py27-memcached,py35-memcached,py27-postgresql,py35-postgresql,py27-mysql,py35-mysql,py27-consul,py35-consul,py27-etcd3,py35-etcd3,py27-etcd3gw,py35-etcd3gw,pep8
|
||||
|
||||
[testenv]
|
||||
# We need to install a bit more than just `test' because those drivers have
|
||||
# custom tests that we always run
|
||||
deps = .[test,zake,ipc,memcached,mysql,etcd,etcd3]
|
||||
deps = .[test,zake,ipc,memcached,mysql,etcd,etcd3,etcd3gw]
|
||||
zookeeper: .[zookeeper]
|
||||
redis: .[redis]
|
||||
sentinel: .[redis]
|
||||
@ -14,6 +14,7 @@ deps = .[test,zake,ipc,memcached,mysql,etcd,etcd3]
|
||||
mysql: .[mysql]
|
||||
etcd: .[etcd]
|
||||
etcd3: .[etcd3]
|
||||
etcd3gw: .[etcd3gw]
|
||||
consul: .[consul]
|
||||
setenv =
|
||||
TOOZ_TEST_URLS = file:///tmp zake:// ipc://
|
||||
@ -81,6 +82,16 @@ setenv = TOOZ_TEST_ETCD3=1
|
||||
commands = {toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd -- {toxinidir}/tools/pretty_tox.sh "{posargs}"
|
||||
{toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd --cluster -- {toxinidir}/tools/pretty_tox.sh "{posargs}"
|
||||
|
||||
[testenv:py27-etcd3gw]
|
||||
setenv = TOOZ_TEST_ETCD3GW=1
|
||||
commands = {toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd -- {toxinidir}/tools/pretty_tox.sh "{posargs}"
|
||||
{toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd --cluster -- {toxinidir}/tools/pretty_tox.sh "{posargs}"
|
||||
|
||||
[testenv:py35-etcd3gw]
|
||||
setenv = TOOZ_TEST_ETCD3GW=1
|
||||
commands = {toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd -- {toxinidir}/tools/pretty_tox.sh "{posargs}"
|
||||
{toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd --cluster -- {toxinidir}/tools/pretty_tox.sh "{posargs}"
|
||||
|
||||
[testenv:py27-consul]
|
||||
commands = {toxinidir}/setup-consul-env.sh pifpaf -e TOOZ_TEST run consul -- {toxinidir}/tools/pretty_tox.sh "{posargs}"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user