From d38fe0301d991300a29ab09175f110356fe47653 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Mon, 4 Aug 2014 15:51:14 +0200 Subject: [PATCH] Switch to URL for loading backends This allow to pass options in a single string, which is going to be easier for managing options. Change-Id: I32409c09153b8abaf2b36c31f0bbf658a9d653bc --- examples/coordinator.py | 2 +- examples/coordinator_heartbeat.py | 2 +- examples/group_membership.py | 2 +- examples/group_membership_watch.py | 2 +- examples/leader_election.py | 2 +- requirements.txt | 2 +- tooz/coordination.py | 27 +++++----- tooz/drivers/ipc.py | 4 +- tooz/drivers/memcached.py | 24 +++++---- tooz/drivers/zookeeper.py | 40 +++++++-------- tooz/tests/test_coordination.py | 81 +++++++++++------------------- 11 files changed, 80 insertions(+), 108 deletions(-) diff --git a/examples/coordinator.py b/examples/coordinator.py index c6d07925..8850115f 100644 --- a/examples/coordinator.py +++ b/examples/coordinator.py @@ -1,5 +1,5 @@ from tooz import coordination -coordinator = coordination.get_coordinator('zookeeper', b'host-1') +coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1') coordinator.start() coordinator.stop() diff --git a/examples/coordinator_heartbeat.py b/examples/coordinator_heartbeat.py index 45539eb5..7f8c81b1 100644 --- a/examples/coordinator_heartbeat.py +++ b/examples/coordinator_heartbeat.py @@ -2,7 +2,7 @@ import time from tooz import coordination -coordinator = coordination.get_coordinator('memcached', b'host-1') +coordinator = coordination.get_coordinator('memcached://localhost', b'host-1') coordinator.start() while True: diff --git a/examples/group_membership.py b/examples/group_membership.py index a94b5e57..963bbb39 100644 --- a/examples/group_membership.py +++ b/examples/group_membership.py @@ -1,6 +1,6 @@ from tooz import coordination -coordinator = coordination.get_coordinator('zookeeper', b'host-1') +coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1') coordinator.start() # Create a group diff --git a/examples/group_membership_watch.py b/examples/group_membership_watch.py index 4da1963c..5204df25 100644 --- a/examples/group_membership_watch.py +++ b/examples/group_membership_watch.py @@ -1,6 +1,6 @@ from tooz import coordination -coordinator = coordination.get_coordinator('zookeeper', b'host-1') +coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1') coordinator.start() # Create a group diff --git a/examples/leader_election.py b/examples/leader_election.py index 2daea4aa..ff1d75fc 100644 --- a/examples/leader_election.py +++ b/examples/leader_election.py @@ -1,6 +1,6 @@ from tooz import coordination -coordinator = coordination.get_coordinator('zookeeper', b'host-1') +coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1') coordinator.start() # Create a group diff --git a/requirements.txt b/requirements.txt index ef2e5db4..dc29e501 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ pbr>=0.6,!=0.7,<1.0 -babel +Babel>=1.3 stevedore>=0.14 six>=1.7.0 iso8601 diff --git a/tooz/coordination.py b/tooz/coordination.py index c17a822a..98daabed 100644 --- a/tooz/coordination.py +++ b/tooz/coordination.py @@ -20,6 +20,8 @@ import collections import six from stevedore import driver +from tooz.openstack.common import network_utils + TOOZ_BACKENDS_NAMESPACE = "tooz.backends" @@ -166,14 +168,11 @@ class CoordinationDriver(object): """ raise NotImplementedError - def start(self, timeout=10): + def start(self): """Start the service engine. If needed, the establishment of a connection to the servers is initiated. - - :param timeout: Time in seconds to wait for connection to succeed. - :type timeout: int """ @staticmethod @@ -314,23 +313,21 @@ class CoordAsyncResult(object): """Returns True if the task is done, False otherwise.""" -# TODO(yassine) -# Replace kwargs by something more simple. -def get_coordinator(backend, member_id, **kwargs): +def get_coordinator(backend_url, member_id): """Initialize and load the backend. - :param backend: the current tooz provided backends are 'zookeeper' + :param backend_url: the backend URL to use :type backend: str :param member_id: the id of the member :type member_id: str - :param kwargs: additional backend specific options - :type kwargs: dict """ - return driver.DriverManager(namespace=TOOZ_BACKENDS_NAMESPACE, - name=backend, - invoke_on_load=True, - invoke_args=(member_id,), - invoke_kwds=kwargs).driver + parsed_url = network_utils.urlsplit(backend_url) + parsed_qs = six.moves.urllib.parse.parse_qs(parsed_url.query) + return driver.DriverManager( + namespace=TOOZ_BACKENDS_NAMESPACE, + name=parsed_url.scheme, + invoke_on_load=True, + invoke_args=(member_id, parsed_url, parsed_qs)).driver class ToozError(Exception): diff --git a/tooz/drivers/ipc.py b/tooz/drivers/ipc.py index 748dac2d..50cbbdad 100644 --- a/tooz/drivers/ipc.py +++ b/tooz/drivers/ipc.py @@ -44,7 +44,7 @@ class IPCLock(locking.Lock): class IPCDriver(coordination.CoordinationDriver): - def __init__(self, member_id, lock_timeout=30): + def __init__(self, member_id, parsed_url, options): """Initialize the IPC driver. :param lock_timeout: how many seconds to wait when trying to acquire @@ -55,7 +55,7 @@ class IPCDriver(coordination.CoordinationDriver): lock_timeout """ super(IPCDriver, self).__init__() - self.lock_timeout = lock_timeout + self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1]) def get_lock(self, name): return IPCLock(name, self.lock_timeout) diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index ab41e2ab..026bd63f 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -79,14 +79,20 @@ class MemcachedDriver(coordination.CoordinationDriver): _MEMBER_PREFIX = b'_TOOZ_MEMBER_' _GROUP_LIST_KEY = b'_TOOZ_GROUP_LIST' - def __init__(self, member_id, membership_timeout=30, lock_timeout=30, - leader_timeout=30): + def __init__(self, member_id, parsed_url, options): super(MemcachedDriver, self).__init__() self._member_id = member_id self._groups = set() - self.membership_timeout = membership_timeout - self.lock_timeout = lock_timeout - self.leader_timeout = leader_timeout + self.host = (parsed_url.hostname or "localhost", + parsed_url.port or 11211) + default_timeout = options.get('timeout', ['30']) + self.timeout = int(default_timeout[-1]) + self.membership_timeout = int(options.get( + 'membership_timeout', default_timeout)[-1]) + self.lock_timeout = int(options.get( + 'lock_timeout', default_timeout)[-1]) + self.leader_timeout = int(options.get( + 'leader_timeout', default_timeout)[-1]) @staticmethod def _msgpack_serializer(key, value): @@ -102,14 +108,14 @@ class MemcachedDriver(coordination.CoordinationDriver): return msgpack.loads(value) raise Exception("Unknown serialization format") - def start(self, host=("127.0.0.1", 11211), timeout=5): + def start(self): try: self.client = pymemcache.client.Client( - host, + self.host, serializer=self._msgpack_serializer, deserializer=self._msgpack_deserializer, - timeout=timeout, - connect_timeout=timeout) + timeout=self.timeout, + connect_timeout=self.timeout) except Exception as e: raise coordination.ToozConnectionError(e) self._group_members = collections.defaultdict(set) diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index 1ba581c3..6ada6f78 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -16,12 +16,14 @@ import collections import copy +import threading from kazoo import client from kazoo import exceptions from kazoo.protocol import paths import six -from zake import fake_client +import zake.fake_client +import zake.fake_storage from tooz import coordination from tooz import locking @@ -43,9 +45,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): _TOOZ_NAMESPACE = b"tooz" - def start(self, timeout=10): + def __init__(self, member_id, parsed_url, options): + super(BaseZooKeeperDriver, self).__init__() + self._member_id = member_id + self.timeout = int(options.get('timeout', ['10'])[-1]) + + def start(self): try: - self._coord.start(timeout=timeout) + self._coord.start(timeout=self.timeout) except self._coord.handler.timeout_exception as e: raise coordination.ToozConnectionError("operation error: %s" % (e)) @@ -201,20 +208,10 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): class KazooDriver(BaseZooKeeperDriver): """The driver using the Kazoo client against real ZooKeeper servers.""" - def __init__(self, member_id, hosts="127.0.0.1:2181", handler=None, - **kwargs): - """:param hosts: the list of zookeeper servers in the - form "ip:port2, ip2:port2". - - :param handler: a kazoo async handler to use if provided, if not - provided the default that kazoo uses internally will be used instead. - """ - - if not all((hosts, member_id)): - raise KeyError("hosts=%r, member_id=%r" % hosts, member_id) + def __init__(self, member_id, parsed_url, options): + super(KazooDriver, self).__init__(member_id, parsed_url, options) + self._coord = client.KazooClient(hosts=parsed_url.netloc) self._member_id = member_id - self._coord = client.KazooClient(hosts=hosts, handler=handler) - super(KazooDriver, self).__init__() def _watch_group(self, group_id): get_members_req = self.get_members(group_id) @@ -361,14 +358,11 @@ class ZakeDriver(BaseZooKeeperDriver): without the need of real ZooKeeper servers. """ - def __init__(self, member_id, storage=None, **kwargs): - """:param storage: a fake storage object.""" + fake_storage = zake.fake_storage.FakeStorage(threading.RLock()) - if not all((storage, member_id)): - raise KeyError("storage=%r, member_id=%r" % storage, member_id) - self._member_id = member_id - self._coord = fake_client.FakeClient(storage=storage) - super(ZakeDriver, self).__init__() + def __init__(self, member_id, parsed_url, options): + super(ZakeDriver, self).__init__(member_id, parsed_url, options) + self._coord = zake.fake_client.FakeClient(storage=self.fake_storage) @staticmethod def watch_join_group(group_id, callback): diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index 22dab717..7b119e94 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2013 eNovance Inc. All Rights Reserved. +# Copyright © 2013-2014 eNovance Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -13,50 +13,34 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -import threading import time import uuid import testscenarios from testtools import testcase -from zake import fake_storage import tooz.coordination from tooz import tests -# Real ZooKeeper server scenario -zookeeper_tests = ('zookeeper_tests', {'backend': 'kazoo', - 'kwargs': {'hosts': '127.0.0.1:2181'}}) - -# Fake Kazoo client scenario -fake_storage = fake_storage.FakeStorage(threading.RLock()) -fake_zookeeper_tests = ('fake_zookeeper_tests', {'backend': 'zake', - 'kwargs': {'storage': - fake_storage}}) - class TestAPI(testscenarios.TestWithScenarios, tests.TestCaseSkipNotImplemented): scenarios = [ - zookeeper_tests, - fake_zookeeper_tests, - ('memcached', {'backend': 'memcached', - 'kwargs': {'membership_timeout': 5}}), - ('ipc', {'backend': 'ipc', - 'kwargs': {'lock_timeout': 2}}), + ('zookeeper', {'url': 'kazoo://127.0.0.1:2181?timeout=5'}), + ('zake', {'url': 'zake://?timeout=5'}), + ('memcached', {'url': 'memcached://?timeout=5'}), + ('ipc', {'url': 'ipc://'}), ] def setUp(self): super(TestAPI, self).setUp() self.group_id = self._get_random_uuid() self.member_id = self._get_random_uuid() - self._coord = tooz.coordination.get_coordinator(self.backend, - self.member_id, - **self.kwargs) + self._coord = tooz.coordination.get_coordinator(self.url, + self.member_id) try: - self._coord.start(timeout=5) + self._coord.start() except tooz.coordination.ToozConnectionError as e: raise testcase.TestSkipped(str(e)) @@ -98,9 +82,8 @@ class TestAPI(testscenarios.TestWithScenarios, def test_join_group_with_member_id_already_exists(self): self._coord.create_group(self.group_id).get() self._coord.join_group(self.group_id).get() - client = tooz.coordination.get_coordinator(self.backend, - self.member_id, - **self.kwargs) + client = tooz.coordination.get_coordinator(self.url, + self.member_id) client.start() join_group = client.join_group(self.group_id) self.assertRaises(tooz.coordination.MemberAlreadyExist, @@ -144,9 +127,8 @@ class TestAPI(testscenarios.TestWithScenarios, def test_get_members(self): group_id_test2 = self._get_random_uuid() member_id_test2 = self._get_random_uuid() - client2 = tooz.coordination.get_coordinator(self.backend, - member_id_test2, - **self.kwargs) + client2 = tooz.coordination.get_coordinator(self.url, + member_id_test2) client2.start() self._coord.create_group(group_id_test2).get() @@ -213,12 +195,11 @@ class TestAPI(testscenarios.TestWithScenarios, self._coord.heartbeat() def test_disconnect_leave_group(self): - if self.backend == 'zake': + if self.url.startswith('zake://'): self.skipTest("Zake has a bug that prevent this test from working") member_id_test2 = self._get_random_uuid() - client2 = tooz.coordination.get_coordinator(self.backend, - member_id_test2, - **self.kwargs) + client2 = tooz.coordination.get_coordinator(self.url, + member_id_test2) client2.start() self._coord.create_group(self.group_id).get() self._coord.join_group(self.group_id).get() @@ -232,12 +213,11 @@ class TestAPI(testscenarios.TestWithScenarios, self.assertTrue(member_id_test2 not in members_ids) def test_timeout(self): - if self.backend != 'memcached': + if not self.url.startswith('memcached://'): self.skipTest("This test only works with memcached for now") member_id_test2 = self._get_random_uuid() - client2 = tooz.coordination.get_coordinator(self.backend, - member_id_test2, - **self.kwargs) + client2 = tooz.coordination.get_coordinator(self.url, + member_id_test2) client2.start() self._coord.create_group(self.group_id).get() self._coord.join_group(self.group_id).get() @@ -258,9 +238,8 @@ class TestAPI(testscenarios.TestWithScenarios, def test_watch_group_join(self): member_id_test2 = self._get_random_uuid() - client2 = tooz.coordination.get_coordinator(self.backend, - member_id_test2, - **self.kwargs) + client2 = tooz.coordination.get_coordinator(self.url, + member_id_test2) client2.start() self._coord.create_group(self.group_id).get() @@ -293,9 +272,8 @@ class TestAPI(testscenarios.TestWithScenarios, def test_watch_leave_group(self): member_id_test2 = self._get_random_uuid() - client2 = tooz.coordination.get_coordinator(self.backend, - member_id_test2, - **self.kwargs) + client2 = tooz.coordination.get_coordinator(self.url, + member_id_test2) client2.start() self._coord.create_group(self.group_id).get() @@ -367,9 +345,8 @@ class TestAPI(testscenarios.TestWithScenarios, self._coord.run_watchers() member_id_test2 = self._get_random_uuid() - client2 = tooz.coordination.get_coordinator(self.backend, - member_id_test2, - **self.kwargs) + client2 = tooz.coordination.get_coordinator(self.url, + member_id_test2) client2.start() client2.watch_elected_as_leader(self.group_id, self._set_event) client2.run_watchers() @@ -421,9 +398,8 @@ class TestAPI(testscenarios.TestWithScenarios, self._coord.run_watchers() member_id_test2 = self._get_random_uuid() - client2 = tooz.coordination.get_coordinator(self.backend, - member_id_test2, - **self.kwargs) + client2 = tooz.coordination.get_coordinator(self.url, + member_id_test2) client2.start() client2.watch_elected_as_leader(self.group_id, self._set_event) client2.run_watchers() @@ -468,9 +444,8 @@ class TestAPI(testscenarios.TestWithScenarios, def test_get_lock_multiple_coords(self): member_id2 = self._get_random_uuid() - client2 = tooz.coordination.get_coordinator(self.backend, - member_id2, - **self.kwargs) + client2 = tooz.coordination.get_coordinator(self.url, + member_id2) client2.start() lock_name = self._get_random_uuid()