Since we use msgpack this can be more than a str
It is often nice to have capabilities that are dicts to allow for more complex capabilities that; and since we use msgpack this is already enabled for usage (since msgpack will serialize/deserialize capabilities provided as dicts already). This updates the zookeeper driver to ensure that capabilites sent are first formatted to msgpack format (before being sent) so that all drivers work in the same manner and accept the same kind of items. Change-Id: I2e19e806d38f70175706610cd92d9e22f606b41b
This commit is contained in:
@@ -226,7 +226,7 @@ class CoordinationDriver(object):
|
||||
:param group_id: the id of the group to join
|
||||
:type group_id: str
|
||||
:param capabilities: the capabilities of the joined member
|
||||
:type capabilities: str
|
||||
:type capabilities: object (typically str)
|
||||
:returns: None
|
||||
:rtype: CoordAsyncResult
|
||||
"""
|
||||
@@ -285,7 +285,7 @@ class CoordinationDriver(object):
|
||||
:param group_id: the id of the group of the current member
|
||||
:type group_id: str
|
||||
:param capabilities: the capabilities of the updated member
|
||||
:type capabilities: str
|
||||
:type capabilities: object (typically str)
|
||||
:returns: None
|
||||
:rtype: CoordAsyncResult
|
||||
"""
|
||||
|
||||
@@ -18,7 +18,6 @@ import collections
|
||||
import logging
|
||||
|
||||
from concurrent import futures
|
||||
import msgpack
|
||||
import pymemcache.client
|
||||
import six
|
||||
|
||||
@@ -103,15 +102,15 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
||||
def _msgpack_serializer(key, value):
|
||||
if isinstance(value, six.binary_type):
|
||||
return value, 1
|
||||
return msgpack.dumps(value), 2
|
||||
return utils.dumps(value), 2
|
||||
|
||||
@staticmethod
|
||||
def _msgpack_deserializer(key, value, flags):
|
||||
if flags == 1:
|
||||
return value
|
||||
if flags == 2:
|
||||
return msgpack.loads(value)
|
||||
raise Exception("Unknown serialization format")
|
||||
return utils.loads(value)
|
||||
raise Exception("Unknown serialization format '%s'" % flags)
|
||||
|
||||
def _start(self):
|
||||
try:
|
||||
@@ -213,7 +212,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
||||
raise coordination.MemberAlreadyExist(group_id,
|
||||
self._member_id)
|
||||
group_members[self._member_id] = {
|
||||
"capabilities": capabilities,
|
||||
b"capabilities": capabilities,
|
||||
}
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
# It changed, let's try again
|
||||
|
||||
@@ -21,7 +21,6 @@ from distutils import version
|
||||
import logging
|
||||
|
||||
from concurrent import futures
|
||||
import msgpack
|
||||
from oslo.utils import strutils
|
||||
import redis
|
||||
from redis import exceptions
|
||||
@@ -291,17 +290,11 @@ class RedisDriver(coordination.CoordinationDriver):
|
||||
|
||||
@staticmethod
|
||||
def _dumps(data):
|
||||
try:
|
||||
return msgpack.dumps(data)
|
||||
except (msgpack.PackException, ValueError) as e:
|
||||
raise coordination.ToozError(utils.exception_message(e))
|
||||
return utils.dumps(data)
|
||||
|
||||
@staticmethod
|
||||
def _loads(blob):
|
||||
try:
|
||||
return msgpack.loads(blob)
|
||||
except (msgpack.UnpackException, ValueError) as e:
|
||||
raise coordination.ToozError(utils.exception_message(e))
|
||||
return utils.loads(blob)
|
||||
|
||||
@classmethod
|
||||
def _make_client(cls, parsed_url, options, default_socket_timeout):
|
||||
|
||||
@@ -72,6 +72,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
||||
def _stop(self):
|
||||
self._coord.stop()
|
||||
|
||||
@staticmethod
|
||||
def _dumps(data):
|
||||
return utils.dumps(data)
|
||||
|
||||
@staticmethod
|
||||
def _loads(blob):
|
||||
return utils.loads(blob)
|
||||
|
||||
@staticmethod
|
||||
def _create_group_handler(async_result, timeout,
|
||||
timeout_exception, group_id):
|
||||
@@ -130,6 +138,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
||||
|
||||
def join_group(self, group_id, capabilities=b""):
|
||||
member_path = self._path_member(group_id, self._member_id)
|
||||
capabilities = self._dumps(capabilities)
|
||||
async_result = self._coord.create_async(member_path,
|
||||
value=capabilities,
|
||||
ephemeral=True)
|
||||
@@ -191,13 +200,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
||||
|
||||
def update_capabilities(self, group_id, capabilities):
|
||||
member_path = self._path_member(group_id, self._member_id)
|
||||
capabilities = self._dumps(capabilities)
|
||||
async_result = self._coord.set_async(member_path, capabilities)
|
||||
return ZooAsyncResult(async_result, self._update_capabilities_handler,
|
||||
timeout_exception=self._timeout_exception,
|
||||
group_id=group_id, member_id=self._member_id)
|
||||
|
||||
@staticmethod
|
||||
def _get_member_capabilities_handler(async_result, timeout,
|
||||
@classmethod
|
||||
def _get_member_capabilities_handler(cls, async_result, timeout,
|
||||
timeout_exception, group_id,
|
||||
member_id):
|
||||
try:
|
||||
@@ -209,7 +219,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
||||
except exceptions.ZookeeperError as e:
|
||||
raise coordination.ToozError(utils.exception_message(e))
|
||||
else:
|
||||
return capabilities
|
||||
return cls._loads(capabilities)
|
||||
|
||||
def get_member_capabilities(self, group_id, member_id):
|
||||
member_path = self._path_member(group_id, member_id)
|
||||
|
||||
@@ -199,6 +199,17 @@ class TestAPI(testscenarios.TestWithScenarios,
|
||||
self.member_id).get()
|
||||
self.assertEqual(capa, b"test_capabilities")
|
||||
|
||||
def test_get_member_capabilities_complex(self):
|
||||
self._coord.create_group(self.group_id).get()
|
||||
caps = {
|
||||
'type': 'warrior',
|
||||
'abilities': ['fight', 'flight', 'double-hit-damage'],
|
||||
}
|
||||
self._coord.join_group(self.group_id, caps)
|
||||
capa = self._coord.get_member_capabilities(self.group_id,
|
||||
self.member_id).get()
|
||||
self.assertEqual(capa, caps)
|
||||
|
||||
def test_get_member_capabilities_nonexistent_group(self):
|
||||
capa = self._coord.get_member_capabilities(self.group_id,
|
||||
self.member_id)
|
||||
|
||||
@@ -16,6 +16,10 @@
|
||||
|
||||
import six
|
||||
|
||||
import msgpack
|
||||
|
||||
from tooz import coordination
|
||||
|
||||
|
||||
def exception_message(exc):
|
||||
"""Return the string representation of exception."""
|
||||
@@ -30,3 +34,27 @@ def to_binary(text, encoding='ascii'):
|
||||
if not isinstance(text, six.binary_type):
|
||||
text = text.encode(encoding)
|
||||
return text
|
||||
|
||||
|
||||
def dumps(data, excp_cls=coordination.ToozError):
|
||||
"""Serializes provided data using msgpack into a byte string.
|
||||
|
||||
TODO(harlowja): use oslo.serialization 'msgpackutils.py' when we can since
|
||||
that handles more native types better than the default does...
|
||||
"""
|
||||
try:
|
||||
return msgpack.packb(data, use_bin_type=True)
|
||||
except (msgpack.PackException, ValueError) as e:
|
||||
raise excp_cls(exception_message(e))
|
||||
|
||||
|
||||
def loads(blob, excp_cls=coordination.ToozError):
|
||||
"""Deserializes provided data using msgpack (from a prior byte string).
|
||||
|
||||
TODO(harlowja): use oslo.serialization 'msgpackutils.py' when we can since
|
||||
that handles more native types better than the default does...
|
||||
"""
|
||||
try:
|
||||
return msgpack.unpackb(blob, encoding='utf-8')
|
||||
except (msgpack.UnpackException, ValueError) as e:
|
||||
raise excp_cls(exception_message(e))
|
||||
|
||||
Reference in New Issue
Block a user