use tooz hashring
tooz provides hashring functionality so let's avoid duplicating it since we already use tooz Change-Id: Id40e4836c5690d252ba0830f6173587f8d5d470c
This commit is contained in:
parent
bb9d962868
commit
b530fc3ec8
@ -20,9 +20,9 @@ from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import tenacity
|
||||
import tooz.coordination
|
||||
from tooz import hashring
|
||||
|
||||
from ceilometer.i18n import _LE, _LI, _LW
|
||||
from ceilometer import utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -216,10 +216,10 @@ class PartitionCoordinator(object):
|
||||
raise MemberNotInGroupError(group_id, members, self._my_id)
|
||||
LOG.debug('Members of group %s are: %s, Me: %s',
|
||||
group_id, members, self._my_id)
|
||||
hr = utils.HashRing(members)
|
||||
hr = hashring.HashRing(members, partitions=100)
|
||||
iterable = list(iterable)
|
||||
filtered = [v for v in iterable
|
||||
if hr.get_node(six.text_type(v)) == self._my_id]
|
||||
if self._my_id in hr.get_nodes(self.encode_task(v))]
|
||||
LOG.debug('The universal set: %s, my subset: %s',
|
||||
[six.text_type(f) for f in iterable],
|
||||
[six.text_type(f) for f in filtered])
|
||||
@ -228,3 +228,8 @@ class PartitionCoordinator(object):
|
||||
LOG.exception(_LE('Error getting group membership info from '
|
||||
'coordination backend.'))
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def encode_task(value):
|
||||
"""encode to bytes"""
|
||||
return six.text_type(value).encode('utf-8')
|
||||
|
@ -18,10 +18,10 @@ import logging
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
import tooz.coordination
|
||||
from tooz import hashring
|
||||
|
||||
from ceilometer import coordination
|
||||
from ceilometer.tests import base
|
||||
from ceilometer import utils
|
||||
|
||||
|
||||
class MockToozCoordinator(object):
|
||||
@ -204,9 +204,10 @@ class TestPartitioning(base.BaseTestCase):
|
||||
agents = ['agent_%s' % i for i in range(10)]
|
||||
|
||||
expected_resources = [list() for _ in range(len(agents))]
|
||||
hr = utils.HashRing(agents)
|
||||
hr = hashring.HashRing(agents, partitions=100)
|
||||
for r in all_resources:
|
||||
key = agents.index(hr.get_node(r))
|
||||
encode = coordination.PartitionCoordinator.encode_task
|
||||
key = agents.index(list(hr.get_nodes(encode(r)))[0])
|
||||
expected_resources[key].append(r)
|
||||
|
||||
agents_kwargs = []
|
||||
@ -289,9 +290,10 @@ class TestPartitioning(base.BaseTestCase):
|
||||
agents = ['agent_%s' % i for i in range(2)]
|
||||
|
||||
expected_resources = [list() for _ in range(len(agents))]
|
||||
hr = utils.HashRing(agents)
|
||||
hr = hashring.HashRing(agents, partitions=100)
|
||||
for r in all_resources:
|
||||
key = agents.index(hr.get_node(r))
|
||||
encode = coordination.PartitionCoordinator.encode_task
|
||||
key = agents.index(list(hr.get_nodes(encode(r)))[0])
|
||||
expected_resources[key].append(r)
|
||||
|
||||
agents_kwargs = []
|
||||
|
@ -146,35 +146,3 @@ class TestUtils(base.BaseTestCase):
|
||||
self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y))
|
||||
self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z))
|
||||
self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z))
|
||||
|
||||
def test_hash_ring(self):
|
||||
num_nodes = 10
|
||||
num_keys = 1000
|
||||
|
||||
nodes = [str(x) for x in range(num_nodes)]
|
||||
hr = utils.HashRing(nodes)
|
||||
|
||||
buckets = [0] * num_nodes
|
||||
assignments = [-1] * num_keys
|
||||
for k in range(num_keys):
|
||||
n = int(hr.get_node(str(k)))
|
||||
self.assertTrue(0 <= n <= num_nodes)
|
||||
buckets[n] += 1
|
||||
assignments[k] = n
|
||||
|
||||
# at least something in each bucket
|
||||
self.assertTrue(all((c > 0 for c in buckets)))
|
||||
|
||||
# approximately even distribution
|
||||
diff = max(buckets) - min(buckets)
|
||||
self.assertTrue(diff < 0.3 * (num_keys / num_nodes))
|
||||
|
||||
# consistency
|
||||
num_nodes += 1
|
||||
nodes.append(str(num_nodes + 1))
|
||||
hr = utils.HashRing(nodes)
|
||||
for k in range(num_keys):
|
||||
n = int(hr.get_node(str(k)))
|
||||
assignments[k] -= n
|
||||
reassigned = len([c for c in assignments if c != 0])
|
||||
self.assertTrue(reassigned < num_keys / num_nodes)
|
||||
|
@ -18,13 +18,10 @@
|
||||
|
||||
"""Utilities and helper functions."""
|
||||
|
||||
import bisect
|
||||
import calendar
|
||||
import copy
|
||||
import datetime
|
||||
import decimal
|
||||
import hashlib
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
|
||||
@ -219,37 +216,6 @@ def hash_of_set(s):
|
||||
return str(hash(frozenset(s)))
|
||||
|
||||
|
||||
class HashRing(object):
|
||||
|
||||
def __init__(self, nodes, replicas=100):
|
||||
self._ring = dict()
|
||||
self._sorted_keys = []
|
||||
|
||||
for node in nodes:
|
||||
for r in six.moves.range(replicas):
|
||||
hashed_key = self._hash('%s-%s' % (node, r))
|
||||
self._ring[hashed_key] = node
|
||||
self._sorted_keys.append(hashed_key)
|
||||
self._sorted_keys.sort()
|
||||
|
||||
@staticmethod
|
||||
def _hash(key):
|
||||
return struct.unpack_from('>I',
|
||||
hashlib.md5(decode_unicode(six
|
||||
.text_type(key))).digest())[0]
|
||||
|
||||
def _get_position_on_ring(self, key):
|
||||
hashed_key = self._hash(key)
|
||||
position = bisect.bisect(self._sorted_keys, hashed_key)
|
||||
return position if position < len(self._sorted_keys) else 0
|
||||
|
||||
def get_node(self, key):
|
||||
if not self._ring:
|
||||
return None
|
||||
pos = self._get_position_on_ring(key)
|
||||
return self._ring[self._sorted_keys[pos]]
|
||||
|
||||
|
||||
def kill_listeners(listeners):
|
||||
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
|
||||
# which stops new messages, and wait(), which processes remaining
|
||||
|
@ -43,7 +43,7 @@ six>=1.9.0 # MIT
|
||||
SQLAlchemy<1.1.0,>=1.0.10 # MIT
|
||||
sqlalchemy-migrate>=0.9.6 # Apache-2.0
|
||||
stevedore>=1.9.0 # Apache-2.0
|
||||
tooz>=1.28.0 # Apache-2.0
|
||||
tooz>=1.47.0 # Apache-2.0
|
||||
WebOb>=1.5.0 # MIT
|
||||
WSME>=0.8 # MIT
|
||||
# NOTE(jd) We do not import it directly, but WSME datetime string parsing
|
||||
|
Loading…
Reference in New Issue
Block a user