Coordinate distributed systems.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

143 lines
4.7KB

  1. # -*- coding: utf-8 -*-
  2. #
  3. # Copyright (C) 2016 Red Hat, Inc.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License. You may obtain
  7. # a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14. # License for the specific language governing permissions and limitations
  15. # under the License.
  16. import bisect
  17. import hashlib
  18. import six
  19. import tooz
  20. from tooz import utils
  21. class UnknownNode(tooz.ToozError):
  22. """Node is unknown."""
  23. def __init__(self, node):
  24. super(UnknownNode, self).__init__("Unknown node `%s'" % node)
  25. self.node = node
  26. class HashRing(object):
  27. """Map objects onto nodes based on their consistent hash."""
  28. DEFAULT_PARTITION_NUMBER = 2**5
  29. def __init__(self, nodes, partitions=DEFAULT_PARTITION_NUMBER):
  30. """Create a new hashring.
  31. :param nodes: List of nodes where objects will be mapped onto.
  32. :param partitions: Number of partitions to spread objects onto.
  33. """
  34. self.nodes = {}
  35. self._ring = dict()
  36. self._partitions = []
  37. self._partition_number = partitions
  38. self.add_nodes(set(nodes))
  39. def add_node(self, node, weight=1):
  40. """Add a node to the hashring.
  41. :param node: Node to add.
  42. :param weight: How many resource instances this node should manage
  43. compared to the other nodes (default 1). Higher weights will be
  44. assigned more resources. Three nodes A, B and C with weights 1, 2 and 3
  45. will each handle 1/6, 1/3 and 1/2 of the resources, respectively.
  46. """
  47. return self.add_nodes((node,), weight)
  48. def add_nodes(self, nodes, weight=1):
  49. """Add nodes to the hashring with equal weight
  50. :param nodes: Nodes to add.
  51. :param weight: How many resource instances this node should manage
  52. compared to the other nodes (default 1). Higher weights will be
  53. assigned more resources. Three nodes A, B and C with weights 1, 2 and 3
  54. will each handle 1/6, 1/3 and 1/2 of the resources, respectively.
  55. """
  56. for node in nodes:
  57. key = utils.to_binary(node, 'utf-8')
  58. key_hash = hashlib.md5(key)
  59. for r in six.moves.range(self._partition_number * weight):
  60. key_hash.update(key)
  61. self._ring[self._hash2int(key_hash)] = node
  62. self.nodes[node] = weight
  63. self._partitions = sorted(self._ring.keys())
  64. def remove_node(self, node):
  65. """Remove a node from the hashring.
  66. Raises py:exc:`UnknownNode`
  67. :param node: Node to remove.
  68. """
  69. try:
  70. weight = self.nodes.pop(node)
  71. except KeyError:
  72. raise UnknownNode(node)
  73. key = utils.to_binary(node, 'utf-8')
  74. key_hash = hashlib.md5(key)
  75. for r in six.moves.range(self._partition_number * weight):
  76. key_hash.update(key)
  77. del self._ring[self._hash2int(key_hash)]
  78. self._partitions = sorted(self._ring.keys())
  79. @staticmethod
  80. def _hash2int(key):
  81. return int(key.hexdigest(), 16)
  82. def _get_partition(self, data):
  83. hashed_key = self._hash2int(hashlib.md5(data))
  84. position = bisect.bisect(self._partitions, hashed_key)
  85. return position if position < len(self._partitions) else 0
  86. def _get_node(self, partition):
  87. return self._ring[self._partitions[partition]]
  88. def get_nodes(self, data, ignore_nodes=None, replicas=1):
  89. """Get the set of nodes which the supplied data map onto.
  90. :param data: A byte identifier to be mapped across the ring.
  91. :param ignore_nodes: Set of nodes to ignore.
  92. :param replicas: Number of replicas to use.
  93. :return: A set of nodes whose length depends on the number of replicas.
  94. """
  95. partition = self._get_partition(data)
  96. ignore_nodes = set(ignore_nodes) if ignore_nodes else set()
  97. candidates = set(self.nodes.keys()) - ignore_nodes
  98. replicas = min(replicas, len(candidates))
  99. nodes = set()
  100. while len(nodes) < replicas:
  101. node = self._get_node(partition)
  102. if node not in ignore_nodes:
  103. nodes.add(node)
  104. partition = (partition + 1
  105. if partition + 1 < len(self._partitions) else 0)
  106. return nodes
  107. def __getitem__(self, key):
  108. return self.get_nodes(key)
  109. def __len__(self):
  110. return len(self._partitions)