More retry code out of memcached
So we can use for other drivers. Change-Id: I2f0b9f5faeaf5c4506df30076499102a3293038c
This commit is contained in:
parent
b5f87e4fc5
commit
455111284f
|
@ -0,0 +1,40 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2014 eNovance
|
||||
#
|
||||
# Author: Julien Danjou <julien@danjou.info>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import retrying
|
||||
|
||||
|
||||
class Retry(Exception):
|
||||
"""Exception raised if we need to retry."""
|
||||
|
||||
|
||||
def retry_if_retry_raised(exception):
|
||||
return isinstance(exception, Retry)
|
||||
|
||||
|
||||
RETRYING_KWARGS = dict(
|
||||
retry_on_exception=retry_if_retry_raised,
|
||||
wait='exponential_sleep',
|
||||
wait_exponential_max=1,
|
||||
)
|
||||
|
||||
|
||||
def retry(f):
|
||||
return retrying.retry(**RETRYING_KWARGS)(f)
|
||||
|
||||
|
||||
Retrying = retrying.Retrying
|
|
@ -22,10 +22,10 @@ import logging
|
|||
from concurrent import futures
|
||||
import msgpack
|
||||
import pymemcache.client
|
||||
import retrying
|
||||
import six
|
||||
|
||||
from tooz import coordination
|
||||
from tooz.drivers import _retry
|
||||
from tooz import locking
|
||||
from tooz import utils
|
||||
|
||||
|
@ -33,25 +33,6 @@ from tooz import utils
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Retry(Exception):
|
||||
"""Exception raised if we need to retry."""
|
||||
|
||||
|
||||
def retry_if_retry_raised(exception):
|
||||
return isinstance(exception, Retry)
|
||||
|
||||
|
||||
_RETRYING_KWARGS = dict(
|
||||
retry_on_exception=retry_if_retry_raised,
|
||||
wait='exponential_sleep',
|
||||
wait_exponential_max=1,
|
||||
)
|
||||
|
||||
|
||||
def retry(f):
|
||||
return retrying.retry(**_RETRYING_KWARGS)(f)
|
||||
|
||||
|
||||
class MemcachedLock(locking.Lock):
|
||||
_LOCK_PREFIX = b'__TOOZ_LOCK_'
|
||||
|
||||
|
@ -60,7 +41,7 @@ class MemcachedLock(locking.Lock):
|
|||
self.coord = coord
|
||||
self.timeout = timeout
|
||||
|
||||
@retry
|
||||
@_retry.retry
|
||||
def acquire(self, blocking=True):
|
||||
def _acquire():
|
||||
if self.coord.client.add(
|
||||
|
@ -72,10 +53,10 @@ class MemcachedLock(locking.Lock):
|
|||
return True
|
||||
if blocking is False:
|
||||
return False
|
||||
raise Retry
|
||||
kwargs = _RETRYING_KWARGS.copy()
|
||||
raise _retry.Retry
|
||||
kwargs = _retry.RETRYING_KWARGS.copy()
|
||||
kwargs['stop_max_delay'] = blocking
|
||||
return retrying.Retrying(**kwargs).call(_acquire)
|
||||
return _retry.Retrying(**kwargs).call(_acquire)
|
||||
|
||||
def release(self):
|
||||
if self.coord.client.delete(self.name, noreply=False):
|
||||
|
@ -174,7 +155,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
def _encode_group_leader(self, group_id):
|
||||
return self._GROUP_LEADER_PREFIX + group_id
|
||||
|
||||
@retry
|
||||
@_retry.retry
|
||||
def _add_group_to_group_list(self, group_id):
|
||||
"""Add group to the group list.
|
||||
|
||||
|
@ -187,12 +168,12 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
if not self.client.cas(self._GROUP_LIST_KEY,
|
||||
list(group_list), cas):
|
||||
# Someone updated the group list before us, try again!
|
||||
raise Retry
|
||||
raise _retry.Retry
|
||||
else:
|
||||
if not self.client.add(self._GROUP_LIST_KEY,
|
||||
[group_id], noreply=False):
|
||||
# Someone updated the group list before us, try again!
|
||||
raise Retry
|
||||
raise _retry.Retry
|
||||
|
||||
def create_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
@ -212,7 +193,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
def join_group(self, group_id, capabilities=b""):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
||||
@retry
|
||||
@_retry.retry
|
||||
def _join_group():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if not cas:
|
||||
|
@ -225,7 +206,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
}
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
# It changed, let's try again
|
||||
raise Retry
|
||||
raise _retry.Retry
|
||||
self._groups.add(group_id)
|
||||
|
||||
return MemcachedFutureResult(self._executor.submit(_join_group))
|
||||
|
@ -233,7 +214,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
def leave_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
||||
@retry
|
||||
@_retry.retry
|
||||
def _leave_group():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if not cas:
|
||||
|
@ -243,7 +224,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
del group_members[self._member_id]
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
# It changed, let's try again
|
||||
raise Retry
|
||||
raise _retry.Retry
|
||||
self._groups.discard(group_id)
|
||||
|
||||
return MemcachedFutureResult(self._executor.submit(_leave_group))
|
||||
|
@ -272,7 +253,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
def update_capabilities(self, group_id, capabilities):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
||||
@retry
|
||||
@_retry.retry
|
||||
def _update_capabilities():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if cas is None:
|
||||
|
@ -282,7 +263,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
group_members[self._member_id][b'capabilities'] = capabilities
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
# It changed, try again
|
||||
raise Retry
|
||||
raise _retry.Retry
|
||||
|
||||
return MemcachedFutureResult(
|
||||
self._executor.submit(_update_capabilities))
|
||||
|
|
|
@ -19,18 +19,18 @@
|
|||
from testtools import testcase
|
||||
|
||||
|
||||
from tooz.drivers import memcached
|
||||
from tooz.drivers import _retry
|
||||
|
||||
|
||||
class TestRetry(testcase.TestCase):
|
||||
def test_retry(self):
|
||||
self.i = 1
|
||||
|
||||
@memcached.retry
|
||||
@_retry.retry
|
||||
def x(add_that):
|
||||
if self.i == 1:
|
||||
self.i += add_that
|
||||
raise memcached.Retry
|
||||
raise _retry.Retry
|
||||
return self.i
|
||||
|
||||
self.assertEqual(x(42), 43)
|
Loading…
Reference in New Issue