Add a consul based driver

For now it only supports locks, in the
future it may support more functionality
as time allows (and/or if people want to
add it).

Co-Authored-By: Joshua Harlow <harlowja@gmail.com>

Implements: bp add-consul-driver

Change-Id: I6064383e9db4e90fd16f17065bd7b0934ca7e125
This commit is contained in:
Vilobh Meshram 2015-11-13 14:26:42 -08:00 committed by Julien Danjou
parent 0cd83bc971
commit f917844ee5
6 changed files with 325 additions and 2 deletions

View File

@ -17,3 +17,4 @@ futurist>=0.6.0 # Apache-2.0
oslo.utils>=3.4.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0
requests!=2.9.0,>=2.8.1 # Apache-2.0
python-consul>=0.4.7 # MIT License

108
setup-consul-env.sh Executable file
View File

@ -0,0 +1,108 @@
#!/bin/bash
set -x
set -e
CONSUL_DOWN_DIR=`mktemp -d`
CONSUL_BIN_DIR=`mktemp -d`
CONSUL_TMP_DATA_DIR=`mktemp -d`
CONSUL_VERSION="0.6.3"
CONSUL_RELEASE_URL="https://releases.hashicorp.com/consul"
if [ ! -d "$CONSUL_DOWN_DIR" ]; then
mkdir -p $CONSUL_DOWN_DIR
fi
if [ ! -d "$CONSUL_BIN_DIR" ]; then
mkdir -p $CONSUL_BIN_DIR
fi
if [ ! -d "$CONSUL_TMP_DATA_DIR" ]; then
mkdir -p $CONSUL_TMP_DATA_DIR
fi
function clean_exit(){
local error_code="$?"
local spawned=$(jobs -p)
if [ -n "$spawned" ]; then
kill $spawned
fi
rm -rf $CONSUL_TMP_DATA_DIR
rm -rf $CONSUL_BIN_DIR
rm -rf $CONSUL_DOWN_DIR
return $error_code
}
function get_leader(){
local leader=""
leader=$(curl -s http://127.0.0.1:8500/v1/status/leader)
if [ $? -ne 0 ]; then
return 1
else
echo $leader | python -c "import sys;\
import json;\
print(json.loads(sys.stdin.read()))"
return 0
fi
}
function wait_until_up(){
local leader=`get_leader`
while [ -z "$leader" ]; do
echo "Waiting for consul to respond to a leader http request"
sleep 1
leader=`get_leader`
done
}
function download_and_expand_consul() {
if [ "$(uname)" == "Darwin" ]; then
local consul_file="consul_${CONSUL_VERSION}_darwin_amd64.zip"
elif [ "$(uname -a | cut -d" " -f1)" == "Linux" ]; then
local consul_file="consul_${CONSUL_VERSION}_linux_amd64.zip"
else
echo "Unknown operating system '$(uname -a)'"
exit 1
fi
wget $CONSUL_RELEASE_URL/$CONSUL_VERSION/$consul_file \
--directory-prefix $CONSUL_DOWN_DIR
if [ $? -ne 0 ]; then
echo "Unable to download consul"
exit 1
fi
unzip $CONSUL_DOWN_DIR/$consul_file -d $CONSUL_BIN_DIR
}
function get_consul_version() {
local consul_bin="$1"
local consul_version=`$consul_bin --version | \
head -n1 | cut -d" " -f2 | \
cut -d"v" -f2`
echo $consul_version
}
trap "clean_exit" EXIT
CONSUL_BIN=`which consul || true`
if [ -z "$CONSUL_BIN" ]; then
echo "Downloading consul $CONSUL_VERSION"
download_and_expand_consul
CONSUL_BIN=$CONSUL_BIN_DIR/consul
if [ ! -e "$CONSUL_BIN" ]; then
echo "Consul executable does not exist (even after downloading it)"
exit 1
fi
else
CONSUL_VERSION=`get_consul_version "$CONSUL_BIN"`
echo "Consul $CONSUL_VERSION is already installed"
fi
$CONSUL_BIN agent -server -bootstrap-expect 1 -data-dir $CONSUL_TMP_DATA_DIR -node=agent-one -bind=127.0.0.1 &
# Give some time for the agent to elect a leader, and startup...
# TODO(harlowja): there has to be a better way to do this, there doesn't
# seem
wait_until_up
export TOOZ_TEST_CONSUL_URL="consul://localhost:8500/v1"
# Yield execution to venv command
$*

View File

@ -35,6 +35,7 @@ tooz.backends =
mysql = tooz.drivers.mysql:MySQLDriver
file = tooz.drivers.file:FileDriver
zookeeper = tooz.drivers.zookeeper:KazooDriver
consul = tooz.drivers.consul:ConsulDriver
[pbr]
warnerrors = True

205
tooz/drivers/consul.py Normal file
View File

@ -0,0 +1,205 @@
# -*- coding: utf-8 -*-
#
# Copyright © 2015 Yahoo! Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from concurrent import futures
import consul
from oslo_utils import encodeutils
import tooz
from tooz import coordination
from tooz.drivers import _retry
from tooz import locking
from tooz import utils
class ConsulLock(locking.Lock):
def __init__(self, name, node, address, session_id, client):
super(ConsulLock, self).__init__(name)
self._name = name
self._node = node
self._address = address
self._session_id = session_id
self._client = client
self.acquired = False
def acquire(self, blocking=True):
@_retry.retry(stop_max_delay=blocking)
def _acquire():
# Check if we are the owner and if we are simulate
# blocking (because consul will not block a second
# acquisition attempt by the same owner).
_index, value = self._client.kv.get(key=self._name)
if value and value.get('Session') == self._session_id:
if blocking is False:
return False
else:
raise _retry.Retry
else:
# The value can be anything.
gotten = self._client.kv.put(key=self._name,
value=u"I got it!",
acquire=self._session_id)
if gotten:
self.acquired = True
return True
if blocking is False:
return False
else:
raise _retry.Retry
return _acquire()
def release(self):
if not self.acquired:
return False
# Get the lock to verify the session ID's are same
_index, contents = self._client.kv.get(key=self._name)
if not contents:
return False
owner = contents.get('Session')
if owner == self._session_id:
removed = self._client.kv.put(key=self._name,
value=self._session_id,
release=self._session_id)
if removed:
self.acquired = False
return True
return False
class ConsulDriver(coordination.CoordinationDriver):
"""This driver uses `python-consul`_ client against `consul`_ servers.
The ConsulDriver implements a minimal set of coordination driver API(s)
needed to make Consul being used as an option for Distributed Locking. The
data is stored in Consul's key-value store.
To configure the client to your liking please refer
http://python-consul.readthedocs.org/en/latest/. Few options like 'ttl'
and 'namespace' will be passed as part of the options. 'ttl' governs the
duration till when the session holding the lock will be active.
.. _python-consul: http://python-consul.readthedocs.org/
.. _consul: https://consul.io/
"""
#: Default namespace when none is provided
TOOZ_NAMESPACE = u"tooz"
#: Default TTL
DEFAULT_TTL = 15
#: Default consul port if not provided.
DEFAULT_PORT = 8500
def __init__(self, member_id, parsed_url, options):
super(ConsulDriver, self).__init__()
options = utils.collapse(options)
self._executor = utils.ProxyExecutor.build("Consul", options)
self._host = parsed_url.hostname
self._port = parsed_url.port or self.DEFAULT_PORT
self._session_id = None
self._session_name = encodeutils.safe_decode(member_id)
self._ttl = int(options.get('ttl', self.DEFAULT_TTL))
namespace = options.get('namespace', self.TOOZ_NAMESPACE)
self._namespace = encodeutils.safe_decode(namespace)
self._client = None
def _start(self):
"""Create a client, register a node and create a session."""
self._executor.start()
# Create a consul client
if self._client is None:
self._client = consul.Consul(host=self._host, port=self._port)
local_agent = self._client.agent.self()
self._node = local_agent['Member']['Name']
self._address = local_agent['Member']['Addr']
# Register a Node
self._client.catalog.register(node=self._node,
address=self._address)
# Create a session
self._session_id = self._client.session.create(
name=self._session_name, node=self._node, ttl=self._ttl)
def _stop(self):
if self._client is not None:
if self._session_id is not None:
self._client.session.destroy(self._session_id)
self._session_id = None
self._client = None
self._executor.stop()
def get_lock(self, name):
real_name = self._paths_join(self._namespace, u"locks", name)
return ConsulLock(real_name, self._node, self._address,
session_id=self._session_id,
client=self._client)
@staticmethod
def _paths_join(*args):
pieces = []
for arg in args:
pieces.append(encodeutils.safe_decode(arg))
return u"/".join(pieces)
@staticmethod
def watch_join_group(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def unwatch_join_group(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def watch_leave_group(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def unwatch_leave_group(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def watch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
class ConsulFutureResult(coordination.CoordAsyncResult):
"""Consul asynchronous result that references a future."""
def __init__(self, fut):
self._fut = fut
def get(self, timeout=10):
try:
return self._fut.result(timeout=timeout)
except futures.TimeoutError as e:
coordination.raise_with_cause(
coordination.OperationTimedOut,
encodeutils.exception_to_unicode(e),
cause=e)
def done(self):
return self._fut.done()

View File

@ -61,7 +61,8 @@ class TestAPI(testscenarios.TestWithScenarios,
('zookeeper', {'url': os.getenv("TOOZ_TEST_ZOOKEEPER_URL"),
'bad_url': 'zookeeper://localhost:1'}),
('etcd', {'url': os.getenv("TOOZ_TEST_ETCD_URL"),
'bad_url': 'etcd://localhost:1'})
'bad_url': 'etcd://localhost:1'}),
('consul', {'url': os.getenv("TOOZ_TEST_CONSUL_URL")}),
]
def assertRaisesAny(self, exc_classes, callable_obj, *args, **kwargs):

View File

@ -1,6 +1,6 @@
[tox]
minversion = 1.6
envlist = py27,py34,py27-zookeeper,py34-zookeeper,py27-redis,py34-redis,py27-sentinel,py34-sentinel,py27-memcached,py34-memcached,py27-postgresql,py34-postgresql,py27-mysql,py34-mysql,pep8
envlist = py27,py34,py27-zookeeper,py34-zookeeper,py27-redis,py34-redis,py27-sentinel,py34-sentinel,py27-memcached,py34-memcached,py27-postgresql,py34-postgresql,py27-mysql,py34-mysql,py27-consul,py34-consul,pep8
[testenv]
deps = -r{toxinidir}/test-requirements.txt
@ -63,6 +63,13 @@ commands = {toxinidir}/setup-etcd-env.sh python setup.py testr --slowest --testr
basepython = python3.4
commands = {toxinidir}/setup-etcd-env.sh python setup.py testr --slowest --testr-args="{posargs}"
[testenv:py27-consul]
commands = {toxinidir}/setup-consul-env.sh python setup.py testr --slowest --testr-args="{posargs}"
[testenv:py34-consul]
basepython = python3.4
commands = {toxinidir}/setup-consul-env.sh python setup.py testr --slowest --testr-args="{posargs}"
[testenv:cover]
commands = python setup.py testr --slowest --coverage --testr-args="{posargs}"