Add memcached driver
Change-Id: Ia537335d3ff1386e0d04129016fb97b5e01c5b79 Co-Author: Sahid Ferdjaoui <sahid.ferdjaoui@cloudwatt.com>
This commit is contained in:
parent
54f40c4c76
commit
214fbd6128
|
@ -1,4 +1,7 @@
|
|||
pbr>=0.5.23
|
||||
stevedore>=0.13
|
||||
six>=1.4.1
|
||||
decorator
|
||||
kazoo==1.3.1
|
||||
pymemcache>=1.2
|
||||
msgpack-python
|
||||
|
|
|
@ -14,8 +14,8 @@ function clean_exit(){
|
|||
stop_zookeeper_server
|
||||
fi
|
||||
rm -rf ${ZOO_TMP_DIR}
|
||||
kill $(jobs -p)
|
||||
return $error_code
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -49,4 +49,6 @@ if [ -d $ZOO_CONF ]; then
|
|||
start_zookeeper_server
|
||||
fi
|
||||
|
||||
memcached &
|
||||
|
||||
python setup.py testr --slowest
|
||||
|
|
|
@ -26,4 +26,4 @@ packages =
|
|||
tooz.backends =
|
||||
kazoo = tooz.drivers.zookeeper:KazooDriver
|
||||
zake = tooz.drivers.zookeeper:ZakeDriver
|
||||
|
||||
memcached = tooz.drivers.memcached:MemcachedDriver
|
||||
|
|
|
@ -0,0 +1,246 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2014 eNovance
|
||||
#
|
||||
# Author: Julien Danjou <julien@danjou.info>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import decorator
|
||||
import random
|
||||
import time
|
||||
|
||||
import itertools
|
||||
import msgpack
|
||||
import pymemcache.client
|
||||
import six
|
||||
|
||||
from tooz import coordination
|
||||
|
||||
|
||||
class Retry(Exception):
|
||||
"""Exception raised if we need to retry."""
|
||||
|
||||
|
||||
@decorator.decorator
|
||||
def retry(f, *args, **kwargs):
|
||||
for try_number in itertools.chain(six.moves.range(10),
|
||||
itertools.repeat(10)):
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except Retry:
|
||||
pass
|
||||
time.sleep(random.randint(0, (2 ** try_number)) / 1000.0)
|
||||
|
||||
|
||||
class MemcachedDriver(coordination.CoordinationDriver):
|
||||
|
||||
_GROUP_PREFIX = b'_TOOZ_GROUP_'
|
||||
_MEMBER_PREFIX = b'_TOOZ_MEMBER_'
|
||||
_GROUP_LIST_KEY = b'_TOOZ_GROUP_LIST'
|
||||
|
||||
def __init__(self, member_id, membership_timeout=30):
|
||||
super(MemcachedDriver, self).__init__()
|
||||
self._member_id = member_id
|
||||
self._groups = set()
|
||||
self.membership_timeout = membership_timeout
|
||||
|
||||
@staticmethod
|
||||
def _msgpack_serializer(key, value):
|
||||
if isinstance(value, six.binary_type):
|
||||
return value, 1
|
||||
return msgpack.dumps(value), 2
|
||||
|
||||
@staticmethod
|
||||
def _msgpack_deserializer(key, value, flags):
|
||||
if flags == 1:
|
||||
return value
|
||||
if flags == 2:
|
||||
return msgpack.loads(value)
|
||||
raise Exception("Unknown serialization format")
|
||||
|
||||
def start(self, host=("127.0.0.1", 11211), timeout=5):
|
||||
try:
|
||||
self.client = pymemcache.client.Client(
|
||||
host,
|
||||
serializer=self._msgpack_serializer,
|
||||
deserializer=self._msgpack_deserializer,
|
||||
timeout=timeout,
|
||||
connect_timeout=timeout)
|
||||
except Exception as e:
|
||||
raise coordination.ToozConnectionError(e)
|
||||
self.heartbeat()
|
||||
|
||||
def stop(self):
|
||||
self.client.delete(self._encode_member_id(self._member_id))
|
||||
map(self.leave_group, list(self._groups))
|
||||
self.client.close()
|
||||
|
||||
def _encode_group_id(self, group_id):
|
||||
return self._GROUP_PREFIX + group_id
|
||||
|
||||
def _encode_member_id(self, member_id):
|
||||
return self._MEMBER_PREFIX + member_id
|
||||
|
||||
@retry
|
||||
def _add_group_to_group_list(self, group_id):
|
||||
"""Add group to the group list.
|
||||
|
||||
:param group_id: The group id
|
||||
"""
|
||||
group_list, cas = self.client.gets(self._GROUP_LIST_KEY)
|
||||
if cas:
|
||||
group_list = set(group_list)
|
||||
group_list.add(group_id)
|
||||
if not self.client.cas(self._GROUP_LIST_KEY,
|
||||
list(group_list), cas):
|
||||
# Someone updated the group list before us, try again!
|
||||
raise Retry
|
||||
else:
|
||||
if not self.client.add(self._GROUP_LIST_KEY,
|
||||
[group_id], noreply=False):
|
||||
# Someone updated the group list before us, try again!
|
||||
raise Retry
|
||||
|
||||
def create_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
if not self.client.add(encoded_group, {}, noreply=False):
|
||||
return MemcachedAsyncError(
|
||||
coordination.GroupAlreadyExist(group_id))
|
||||
self._add_group_to_group_list(group_id)
|
||||
return MemcachedAsyncResult(None)
|
||||
|
||||
def get_groups(self):
|
||||
return MemcachedAsyncResult(
|
||||
self.client.get(self._GROUP_LIST_KEY) or [])
|
||||
|
||||
@retry
|
||||
def join_group(self, group_id, capabilities=b""):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if not cas:
|
||||
return MemcachedAsyncError(
|
||||
coordination.GroupNotCreated(group_id))
|
||||
if self._member_id in group_members:
|
||||
return MemcachedAsyncError(
|
||||
coordination.MemberAlreadyExist(group_id, self._member_id))
|
||||
group_members[self._member_id] = {
|
||||
"capabilities": capabilities,
|
||||
}
|
||||
if not self.client.cas(encoded_group,
|
||||
group_members,
|
||||
cas):
|
||||
# It changed, let's try again
|
||||
raise Retry
|
||||
self._groups.add(group_id)
|
||||
return MemcachedAsyncResult(None)
|
||||
|
||||
@retry
|
||||
def leave_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if not cas:
|
||||
return MemcachedAsyncError(
|
||||
coordination.GroupNotCreated(group_id))
|
||||
if self._member_id not in group_members:
|
||||
return MemcachedAsyncError(
|
||||
coordination.MemberNotJoined(group_id,
|
||||
self._member_id))
|
||||
del group_members[self._member_id]
|
||||
if not self.client.cas(encoded_group,
|
||||
group_members,
|
||||
cas):
|
||||
# It changed, let's try again
|
||||
raise Retry
|
||||
self._groups.remove(group_id)
|
||||
return MemcachedAsyncResult(None)
|
||||
|
||||
def _get_members(self, group_id):
|
||||
group_members = self.client.get(self._encode_group_id(group_id))
|
||||
if group_members is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
return dict((m, v) for m, v in six.iteritems(group_members)
|
||||
if self.client.get(self._encode_member_id(m)))
|
||||
|
||||
def get_members(self, group_id):
|
||||
try:
|
||||
return MemcachedAsyncResult(self._get_members(group_id).keys())
|
||||
except Exception as e:
|
||||
return MemcachedAsyncError(e)
|
||||
|
||||
def get_member_capabilities(self, group_id, member_id):
|
||||
try:
|
||||
group_members = self._get_members(group_id)
|
||||
except Exception as e:
|
||||
return MemcachedAsyncError(e)
|
||||
if member_id not in group_members:
|
||||
return MemcachedAsyncError(
|
||||
coordination.MemberNotJoined(group_id, member_id))
|
||||
return MemcachedAsyncResult(group_members[member_id][b'capabilities'])
|
||||
|
||||
@retry
|
||||
def update_capabilities(self, group_id, capabilities):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if cas is None:
|
||||
return MemcachedAsyncError(
|
||||
coordination.GroupNotCreated(group_id))
|
||||
if self._member_id not in group_members:
|
||||
return MemcachedAsyncError(
|
||||
coordination.MemberNotJoined(group_id, self._member_id))
|
||||
group_members[self._member_id][b'capabilities'] = capabilities
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
# It changed, try again
|
||||
raise Retry
|
||||
return MemcachedAsyncResult(None)
|
||||
|
||||
def heartbeat(self):
|
||||
self.client.set(self._encode_member_id(self._member_id),
|
||||
"It's alive!",
|
||||
expire=self.membership_timeout)
|
||||
|
||||
|
||||
class MemcachedAsyncResult(coordination.CoordAsyncResult):
|
||||
"""Memcached asynchronous result.
|
||||
|
||||
Unfortunately, this is mostely a fake because our driver is not
|
||||
asynchronous at all. :-(.
|
||||
|
||||
"""
|
||||
def __init__(self, result):
|
||||
self.result = result
|
||||
|
||||
def get(self, timeout=0):
|
||||
return self.result
|
||||
|
||||
@staticmethod
|
||||
def done():
|
||||
return True
|
||||
|
||||
|
||||
class MemcachedAsyncError(coordination.CoordAsyncResult):
|
||||
"""Memcached asynchronous error.
|
||||
|
||||
Unfortunately, this is mostely a fake because our driver is not
|
||||
asynchronous at all. :-(.
|
||||
|
||||
"""
|
||||
def __init__(self, error):
|
||||
self.error = error
|
||||
|
||||
def get(self, timeout=0):
|
||||
raise self.error
|
||||
|
||||
@staticmethod
|
||||
def done():
|
||||
return True
|
|
@ -0,0 +1,36 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2014 eNovance
|
||||
#
|
||||
# Author: Julien Danjou <julien@danjou.info>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from testtools import testcase
|
||||
|
||||
|
||||
from tooz.drivers import memcached
|
||||
|
||||
|
||||
class TestRetry(testcase.TestCase):
|
||||
def test_retry(self):
|
||||
self.i = 1
|
||||
|
||||
@memcached.retry
|
||||
def x(add_that):
|
||||
if self.i == 1:
|
||||
self.i += add_that
|
||||
raise memcached.Retry
|
||||
return self.i
|
||||
|
||||
self.assertEqual(x(42), 43)
|
|
@ -15,6 +15,7 @@
|
|||
# under the License.
|
||||
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import testscenarios
|
||||
|
@ -36,7 +37,12 @@ fake_zookeeper_tests = ('fake_zookeeper_tests', {'backend': 'zake',
|
|||
|
||||
class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase):
|
||||
|
||||
scenarios = [zookeeper_tests, fake_zookeeper_tests]
|
||||
scenarios = [
|
||||
zookeeper_tests,
|
||||
fake_zookeeper_tests,
|
||||
('memcached', {'backend': 'memcached',
|
||||
'kwargs': {'membership_timeout': 5}}),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestAPI, self).setUp()
|
||||
|
@ -45,6 +51,9 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase):
|
|||
self._coord = tooz.coordination.get_coordinator(self.backend,
|
||||
self.member_id,
|
||||
**self.kwargs)
|
||||
# HACK(jd) Disable memcached on py33 for the time being, activate as
|
||||
# soon as https://github.com/pinterest/pymemcache/pull/16 is merged
|
||||
# and pymemcache is released
|
||||
try:
|
||||
self._coord.start(timeout=5)
|
||||
except tooz.coordination.ToozConnectionError as e:
|
||||
|
@ -114,8 +123,14 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase):
|
|||
all_group_ids = self._coord.get_groups().get()
|
||||
self.assertTrue(self.group_id not in all_group_ids)
|
||||
leave_group = self._coord.leave_group(self.group_id)
|
||||
self.assertRaises(tooz.coordination.MemberNotJoined,
|
||||
leave_group.get)
|
||||
try:
|
||||
leave_group.get()
|
||||
# Drivers raise one of those depending on their capability
|
||||
except (tooz.coordination.MemberNotJoined,
|
||||
tooz.coordination.GroupNotCreated):
|
||||
pass
|
||||
else:
|
||||
self.fail("Exception not raised")
|
||||
|
||||
def test_leave_group_not_joined_by_member(self):
|
||||
self._coord.create_group(self.group_id).get()
|
||||
|
@ -184,8 +199,14 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase):
|
|||
def test_update_capabilities_with_group_id_nonexistent(self):
|
||||
update_cap = self._coord.update_capabilities(self.group_id,
|
||||
b'test_capabilities')
|
||||
self.assertRaises(tooz.coordination.MemberNotJoined,
|
||||
update_cap.get)
|
||||
try:
|
||||
update_cap.get()
|
||||
# Drivers raise one of those depending on their capability
|
||||
except (tooz.coordination.MemberNotJoined,
|
||||
tooz.coordination.GroupNotCreated):
|
||||
pass
|
||||
else:
|
||||
self.fail("Exception not raised")
|
||||
|
||||
def test_heartbeat(self):
|
||||
self._coord.heartbeat()
|
||||
|
@ -209,6 +230,27 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase):
|
|||
self.assertTrue(self.member_id in members_ids)
|
||||
self.assertTrue(member_id_test2 not in members_ids)
|
||||
|
||||
def test_timeout(self):
|
||||
if self.backend != 'memcached':
|
||||
self.skipTest("This test only works with memcached for now")
|
||||
member_id_test2 = self._get_random_uuid()
|
||||
client2 = tooz.coordination.get_coordinator(self.backend,
|
||||
member_id_test2,
|
||||
**self.kwargs)
|
||||
client2.start()
|
||||
self._coord.create_group(self.group_id).get()
|
||||
self._coord.join_group(self.group_id).get()
|
||||
client2.join_group(self.group_id).get()
|
||||
members_ids = self._coord.get_members(self.group_id).get()
|
||||
self.assertTrue(self.member_id in members_ids)
|
||||
self.assertTrue(member_id_test2 in members_ids)
|
||||
time.sleep(3)
|
||||
self._coord.heartbeat()
|
||||
time.sleep(3)
|
||||
members_ids = self._coord.get_members(self.group_id).get()
|
||||
self.assertTrue(self.member_id in members_ids)
|
||||
self.assertTrue(member_id_test2 not in members_ids)
|
||||
|
||||
@staticmethod
|
||||
def _get_random_uuid():
|
||||
return str(uuid.uuid4()).encode('ascii')
|
||||
|
|
Loading…
Reference in New Issue