Add partitioner implementation
Add a new object called `Partitioner' which is based on the consistent hashring implementation of tooz.hashring. The coordinator API allows to create such an object and to use it to dispatch objects ownership across several workers. Change-Id: Iae8c861d5b6c15b714f1ec3bd7090d15dd468421
This commit is contained in:
parent
f3dddcffdd
commit
9ad740fa1e
11
doc/source/tutorial/partitioner.rst
Normal file
11
doc/source/tutorial/partitioner.rst
Normal file
@ -0,0 +1,11 @@
|
||||
=============
|
||||
Partitioner
|
||||
=============
|
||||
|
||||
Tooz provides a partitioner object based on its consistent hash ring
|
||||
implementation. It can be used to map Python objects to one or several nodes.
|
||||
The partitioner object automatically keeps track of nodes joining and leaving
|
||||
the group, so the rebalancing is managed.
|
||||
|
||||
.. literalinclude:: ../../../examples/partitioner.py
|
||||
:language: python
|
11
examples/partitioner.py
Normal file
11
examples/partitioner.py
Normal file
@ -0,0 +1,11 @@
|
||||
from tooz import coordination
|
||||
|
||||
coordinator = coordination.get_coordinator('zake://', b'host-1')
|
||||
coordinator.start()
|
||||
partitioner = coordinator.join_partitioned_group("group1")
|
||||
|
||||
# Returns {'host-1'}
|
||||
member = partitioner.members_for_object(object())
|
||||
|
||||
coordinator.leave_partitioned_group(partitioner)
|
||||
coordinator.stop()
|
6
releasenotes/notes/partitioner-4005767d287dc7c9.yaml
Normal file
6
releasenotes/notes/partitioner-4005767d287dc7c9.yaml
Normal file
@ -0,0 +1,6 @@
|
||||
---
|
||||
features:
|
||||
- >-
|
||||
Introduce a new partitioner object. This object is synchronized within a
|
||||
group of nodes and exposes a way to distribute object management across
|
||||
several nodes.
|
@ -29,6 +29,7 @@ from stevedore import driver
|
||||
|
||||
import tooz
|
||||
from tooz import _retry
|
||||
from tooz import partitioner
|
||||
from tooz import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -258,6 +259,40 @@ class CoordinationDriver(object):
|
||||
return (group_id in self._hooks_join_group or
|
||||
group_id in self._hooks_leave_group)
|
||||
|
||||
def join_partitioned_group(
|
||||
self, group_id,
|
||||
weight=1,
|
||||
partitions=partitioner.Partitioner.DEFAULT_PARTITION_NUMBER):
|
||||
"""Join a group and get a partitioner.
|
||||
|
||||
A partitioner allows to distribute a bunch of objects across several
|
||||
members using a consistent hash ring. Each object gets assigned (at
|
||||
least) one member responsible for it. It's then possible to check which
|
||||
object is owned by any member of the group.
|
||||
|
||||
This method also creates if necessary, and joins the group with the
|
||||
selected weight.
|
||||
|
||||
:param group_id: The group to create a partitioner for.
|
||||
:param weight: The weight to use in the hashring for this node.
|
||||
:param partitions: The number of partitions to create.
|
||||
:return: A :py:class:`~tooz.partitioner.Partitioner` object.
|
||||
|
||||
"""
|
||||
self.join_group_create(
|
||||
group_id, capabilities=utils.dumps({'weight': weight}))
|
||||
return partitioner.Partitioner(self, group_id)
|
||||
|
||||
def leave_partitioned_group(self, partitioner):
|
||||
"""Leave a partitioned group.
|
||||
|
||||
This leaves the partitioned group and stop the partitioner.
|
||||
:param group_id: The group to create a partitioner for.
|
||||
"""
|
||||
leave = self.leave_group(partitioner.group_id)
|
||||
partitioner.stop()
|
||||
return leave.get()
|
||||
|
||||
@staticmethod
|
||||
def run_watchers(timeout=None):
|
||||
"""Run the watchers callback.
|
||||
|
@ -31,7 +31,9 @@ class UnknownNode(tooz.ToozError):
|
||||
class HashRing(object):
|
||||
"""Map objects onto nodes based on their consistent hash."""
|
||||
|
||||
def __init__(self, nodes, partitions=2**5):
|
||||
DEFAULT_PARTITION_NUMBER = 2**5
|
||||
|
||||
def __init__(self, nodes, partitions=DEFAULT_PARTITION_NUMBER):
|
||||
"""Create a new hashring.
|
||||
|
||||
:param nodes: List of nodes where objects will be mapped onto.
|
||||
|
107
tooz/partitioner.py
Normal file
107
tooz/partitioner.py
Normal file
@ -0,0 +1,107 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import logging
|
||||
|
||||
from tooz import hashring
|
||||
from tooz import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Partitioner(object):
|
||||
"""Partition set of objects across several members.
|
||||
|
||||
Objects to be partitioned should implement the __tooz_hash__ method to
|
||||
identify themselves across the consistent hashring. This should method
|
||||
return bytes.
|
||||
|
||||
"""
|
||||
|
||||
DEFAULT_PARTITION_NUMBER = hashring.HashRing.DEFAULT_PARTITION_NUMBER
|
||||
|
||||
def __init__(self, coordinator, group_id,
|
||||
partitions=DEFAULT_PARTITION_NUMBER):
|
||||
self.partitions = partitions
|
||||
self.group_id = group_id
|
||||
self._coord = coordinator
|
||||
self._coord.watch_join_group(self.group_id, self._on_member_join)
|
||||
self._coord.watch_leave_group(self.group_id, self._on_member_leave)
|
||||
members = self._coord.get_members(self.group_id)
|
||||
self.ring = hashring.HashRing(members.get(),
|
||||
partitions=self.partitions)
|
||||
|
||||
def _on_member_join(self, event):
|
||||
try:
|
||||
weight = utils.loads(self._coord.get_member_capabilities(
|
||||
self.group_id, event.member_id).get()).get("weight", 1)
|
||||
except utils.SerializationError:
|
||||
# This node does not seem to have joined with the partitioner
|
||||
# system, so just ignore it.
|
||||
LOG.warning(
|
||||
"Node %s did not join group %s in partition mode, ignoring",
|
||||
self.group_id, event.member_id)
|
||||
else:
|
||||
self.ring.add_node(event.member_id, weight)
|
||||
|
||||
def _on_member_leave(self, event):
|
||||
self.ring.remove_node(event.member_id)
|
||||
|
||||
@staticmethod
|
||||
def _hash_object(obj):
|
||||
if hasattr(obj, "__tooz_hash__"):
|
||||
return obj.__tooz_hash__()
|
||||
return str(hash(obj)).encode('ascii')
|
||||
|
||||
def members_for_object(self, obj, ignore_members=None, replicas=1):
|
||||
"""Return the members responsible for an object.
|
||||
|
||||
:param obj: The object to check owning for.
|
||||
:param member_id: The member to check if it owns the object.
|
||||
:param ignore_members: Group members to ignore.
|
||||
:param replicas: Number of replicas for the object.
|
||||
"""
|
||||
return self.ring.get_nodes(self._hash_object(obj),
|
||||
ignore_nodes=ignore_members,
|
||||
replicas=replicas)
|
||||
|
||||
def belongs_to_member(self, obj, member_id,
|
||||
ignore_members=None, replicas=1):
|
||||
"""Return whether an object belongs to a member.
|
||||
|
||||
:param obj: The object to check owning for.
|
||||
:param member_id: The member to check if it owns the object.
|
||||
:param ignore_members: Group members to ignore.
|
||||
:param replicas: Number of replicas for the object.
|
||||
"""
|
||||
return member_id in self.members_for_object(
|
||||
obj, ignore_members=ignore_members, replicas=replicas)
|
||||
|
||||
def belongs_to_self(self, obj, ignore_members=None, replicas=1):
|
||||
"""Return whether an object belongs to this coordinator.
|
||||
|
||||
:param obj: The object to check owning for.
|
||||
:param ignore_members: Group members to ignore.
|
||||
:param replicas: Number of replicas for the object.
|
||||
"""
|
||||
return self.belongs_to_member(obj, self._coord._member_id,
|
||||
ignore_members=ignore_members,
|
||||
replicas=replicas)
|
||||
|
||||
def stop(self):
|
||||
"""Stop the partitioner."""
|
||||
self._coord.unwatch_join_group(self.group_id, self._on_member_join)
|
||||
self._coord.unwatch_leave_group(self.group_id, self._on_member_leave)
|
@ -15,7 +15,9 @@
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
import os
|
||||
|
||||
import fixtures
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
from testtools import testcase
|
||||
@ -48,5 +50,20 @@ class SkipNotImplementedMeta(type):
|
||||
|
||||
|
||||
@six.add_metaclass(SkipNotImplementedMeta)
|
||||
class TestCaseSkipNotImplemented(testcase.TestCase):
|
||||
pass
|
||||
class TestWithCoordinator(testcase.TestCase):
|
||||
url = os.getenv("TOOZ_TEST_URL")
|
||||
|
||||
def setUp(self):
|
||||
super(TestWithCoordinator, self).setUp()
|
||||
if self.url is None:
|
||||
self.skipTest("No URL set for this driver")
|
||||
self.useFixture(fixtures.NestedTempfile())
|
||||
self.group_id = get_random_uuid()
|
||||
self.member_id = get_random_uuid()
|
||||
self._coord = tooz.coordination.get_coordinator(self.url,
|
||||
self.member_id)
|
||||
self._coord.start()
|
||||
|
||||
def tearDown(self):
|
||||
self._coord.stop()
|
||||
super(TestWithCoordinator, self).tearDown()
|
||||
|
@ -14,12 +14,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
from concurrent import futures
|
||||
import fixtures
|
||||
import mock
|
||||
from six.moves.urllib import parse
|
||||
from testtools import matchers
|
||||
@ -39,10 +37,7 @@ def try_to_lock_job(name, coord, url, member_id):
|
||||
return lock2.acquire(blocking=False)
|
||||
|
||||
|
||||
class TestAPI(tests.TestCaseSkipNotImplemented):
|
||||
|
||||
url = os.getenv("TOOZ_TEST_URL")
|
||||
|
||||
class TestAPI(tests.TestWithCoordinator):
|
||||
def assertRaisesAny(self, exc_classes, callable_obj, *args, **kwargs):
|
||||
checkers = [matchers.MatchesException(exc_class)
|
||||
for exc_class in exc_classes]
|
||||
@ -50,21 +45,6 @@ class TestAPI(tests.TestCaseSkipNotImplemented):
|
||||
callable_obj = testcase.Nullary(callable_obj, *args, **kwargs)
|
||||
self.assertThat(callable_obj, matcher)
|
||||
|
||||
def setUp(self):
|
||||
super(TestAPI, self).setUp()
|
||||
if self.url is None:
|
||||
self.skipTest("No URL set for this driver")
|
||||
self.useFixture(fixtures.NestedTempfile())
|
||||
self.group_id = tests.get_random_uuid()
|
||||
self.member_id = tests.get_random_uuid()
|
||||
self._coord = tooz.coordination.get_coordinator(self.url,
|
||||
self.member_id)
|
||||
self._coord.start()
|
||||
|
||||
def tearDown(self):
|
||||
self._coord.stop()
|
||||
super(TestAPI, self).tearDown()
|
||||
|
||||
def test_connection_error_bad_host(self):
|
||||
if (tooz.coordination.Characteristics.DISTRIBUTED_ACROSS_HOSTS
|
||||
not in self._coord.CHARACTERISTICS):
|
||||
|
92
tooz/tests/test_partitioner.py
Normal file
92
tooz/tests/test_partitioner.py
Normal file
@ -0,0 +1,92 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2016 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import six
|
||||
|
||||
from tooz import coordination
|
||||
from tooz import tests
|
||||
|
||||
|
||||
class TestPartitioner(tests.TestWithCoordinator):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPartitioner, self).setUp()
|
||||
self._extra_coords = []
|
||||
|
||||
def tearDown(self):
|
||||
for c in self._extra_coords:
|
||||
c.stop()
|
||||
super(TestPartitioner, self).tearDown()
|
||||
|
||||
def _add_members(self, number_of_members):
|
||||
for _ in six.moves.range(number_of_members):
|
||||
m = tests.get_random_uuid()
|
||||
coord = coordination.get_coordinator(self.url, m)
|
||||
coord.start()
|
||||
coord.join_partitioned_group(self.group_id)
|
||||
self._extra_coords.append(coord)
|
||||
self._coord.run_watchers()
|
||||
|
||||
def _remove_members(self, number_of_members):
|
||||
for _ in six.moves.range(number_of_members):
|
||||
c = self._extra_coords.pop()
|
||||
c.stop()
|
||||
self._coord.run_watchers()
|
||||
|
||||
def test_join_partitioned_group(self):
|
||||
group_id = tests.get_random_uuid()
|
||||
self._coord.join_partitioned_group(group_id)
|
||||
|
||||
def test_hashring_size(self):
|
||||
p = self._coord.join_partitioned_group(self.group_id)
|
||||
self.assertEqual(1, len(p.ring.nodes))
|
||||
self._add_members(1)
|
||||
self.assertEqual(2, len(p.ring.nodes))
|
||||
self._add_members(2)
|
||||
self.assertEqual(4, len(p.ring.nodes))
|
||||
self._remove_members(3)
|
||||
self.assertEqual(1, len(p.ring.nodes))
|
||||
p.stop()
|
||||
|
||||
def test_stop(self):
|
||||
p = self._coord.join_partitioned_group(self.group_id)
|
||||
p.stop()
|
||||
self.assertEqual(0, len(self._coord._hooks_join_group))
|
||||
self.assertEqual(0, len(self._coord._hooks_leave_group))
|
||||
|
||||
def test_members_of_object_and_others(self):
|
||||
p = self._coord.join_partitioned_group(self.group_id)
|
||||
self._add_members(3)
|
||||
o = object()
|
||||
m = p.members_for_object(o)
|
||||
self.assertEqual(1, len(m))
|
||||
m = m.pop()
|
||||
self.assertTrue(p.belongs_to_member(o, m))
|
||||
self.assertFalse(p.belongs_to_member(o, b"chupacabra"))
|
||||
maybe = self.assertTrue if m == self.member_id else self.assertFalse
|
||||
maybe(p.belongs_to_self(o))
|
||||
p.stop()
|
||||
|
||||
|
||||
class ZakeTestPartitioner(TestPartitioner):
|
||||
url = "zake://"
|
||||
|
||||
|
||||
class IPCTestPartitioner(TestPartitioner):
|
||||
url = "ipc://"
|
||||
|
||||
|
||||
class FileTestPartitioner(TestPartitioner):
|
||||
url = "file:///tmp"
|
Loading…
x
Reference in New Issue
Block a user