Add a messagepack utils helper module

Messagepack is used by some projects, so providing a more
featureful encoding/decoding functionality that works better
with specialized types such as datetime, uuids and so-on in
a customized manner would be quite useful for those projects
that use it so they don't blow up when encoding and don't need
to resort to lossy serialization (by using jsonutils).

Change-Id: I295bfca0737301d15414410822bfbb28f66370dd
This commit is contained in:
Joshua Harlow 2015-01-14 17:13:32 -08:00
parent 33d9935675
commit 2a30128b72
3 changed files with 248 additions and 0 deletions

View File

@ -0,0 +1,130 @@
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import itertools
import sys
import uuid
import msgpack
from oslo.utils import importutils
from oslo.utils import timeutils
import six
import six.moves.xmlrpc_client as xmlrpclib
netaddr = importutils.try_import("netaddr")
# NOTE(harlowja): itertools.count only started to take a step value
# in python 2.7+ so we can't use it in 2.6...
if sys.version_info[0:2] == (2, 6):
_PY26 = True
else:
_PY26 = False
def _serialize_datetime(dt):
blob = timeutils.strtime(dt)
if six.PY3:
return blob.encode('ascii')
return blob
def _deserialize_datetime(blob):
return timeutils.parse_strtime(six.text_type(blob, encoding='ascii'))
def _serializer(obj):
# Applications can assign 0 to 127 to store
# application-specific type information...
if isinstance(obj, uuid.UUID):
return msgpack.ExtType(0, six.text_type(obj.hex).encode('ascii'))
if isinstance(obj, datetime.datetime):
return msgpack.ExtType(1, _serialize_datetime(obj))
if type(obj) == itertools.count:
# FIXME(harlowja): figure out a better way to avoid hacking into
# the string representation of count to get at the right numbers...
obj = six.text_type(obj)
start = obj.find("(") + 1
end = obj.rfind(")")
pieces = obj[start:end].split(",")
if len(pieces) == 1:
start = int(pieces[0])
step = 1
else:
start = int(pieces[0])
step = int(pieces[1])
return msgpack.ExtType(2, msgpack.packb([start, step]))
if netaddr and isinstance(obj, netaddr.IPAddress):
return msgpack.ExtType(3, msgpack.packb(obj.value))
if isinstance(obj, (set, frozenset)):
value = dumps(list(obj))
if isinstance(obj, set):
ident = 4
else:
ident = 5
return msgpack.ExtType(ident, value)
if isinstance(obj, xmlrpclib.DateTime):
dt = datetime.datetime(*tuple(obj.timetuple())[:6])
return msgpack.ExtType(6, _serialize_datetime(dt))
raise TypeError("Unknown type: %r" % (obj,))
def _unserializer(code, data):
if code == 0:
return uuid.UUID(hex=six.text_type(data, encoding='ascii'))
if code == 1:
return _deserialize_datetime(data)
if code == 2:
value = msgpack.unpackb(data)
if not _PY26:
return itertools.count(value[0], value[1])
else:
return itertools.count(value[0])
if netaddr and code == 3:
value = msgpack.unpackb(data)
return netaddr.IPAddress(value)
if code in (4, 5):
value = loads(data)
if code == 4:
return set(value)
else:
return frozenset(value)
if code == 6:
dt = _deserialize_datetime(data)
return xmlrpclib.DateTime(dt.timetuple())
return msgpack.ExtType(code, data)
def load(fp):
"""Deserialize ``fp`` into a Python object."""
# NOTE(harlowja): the reason we can't use the more native msgpack functions
# here is that the unpack() function (oddly) doesn't seem to take a
# 'ext_hook' parameter..
return msgpack.Unpacker(fp, ext_hook=_unserializer,
encoding='utf-8').unpack()
def dump(obj, fp):
"""Serialize ``obj`` as a messagepack formatted stream to ``fp``"""
return msgpack.pack(obj, fp, default=_serializer, use_bin_type=True)
def dumps(obj):
"""Serialize ``obj`` to a messagepack formatted ``str``."""
return msgpack.packb(obj, default=_serializer, use_bin_type=True)
def loads(s):
"""Deserialize ``s`` messagepack ``str`` into a Python object."""
return msgpack.unpackb(s, ext_hook=_unserializer, encoding='utf-8')

View File

@ -5,6 +5,7 @@
pbr>=0.6,!=0.7,<1.0
Babel>=1.3
six>=1.7.0
msgpack-python>=0.4.0
# Only for timeutils in openstack.common - once we're using the
# library version this can be removed.
iso8601>=0.1.9

117
tests/test_msgpackutils.py Normal file
View File

@ -0,0 +1,117 @@
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import itertools
import sys
import uuid
import netaddr
from oslotest import base as test_base
import six
import six.moves.xmlrpc_client as xmlrpclib
import testtools
from oslo_serialization import msgpackutils
# NOTE(harlowja): itertools.count only started to take a step value
# in python 2.7+ so we can't use it in 2.6...
if sys.version_info[0:2] == (2, 6):
_PY26 = True
else:
_PY26 = False
def _dumps_loads(obj):
obj = msgpackutils.dumps(obj)
return msgpackutils.loads(obj)
class MsgPackUtilsTestMixin(test_base.BaseTestCase):
def test_list(self):
self.assertEqual(_dumps_loads([1, 2, 3]), [1, 2, 3])
def test_empty_list(self):
self.assertEqual(_dumps_loads([]), [])
def test_tuple(self):
# Seems like we do lose whether it was a tuple or not...
#
# Maybe fixed someday:
#
# https://github.com/msgpack/msgpack-python/issues/98
self.assertEqual(_dumps_loads((1, 2, 3)), [1, 2, 3])
def test_dict(self):
self.assertEqual(_dumps_loads(dict(a=1, b=2, c=3)),
dict(a=1, b=2, c=3))
def test_empty_dict(self):
self.assertEqual(_dumps_loads({}), {})
def test_complex_dict(self):
src = {
'now': datetime.datetime(1920, 2, 3, 4, 5, 6, 7),
'later': datetime.datetime(1921, 2, 3, 4, 5, 6, 9),
'a': 1,
'b': 2.0,
'c': [],
'd': set([1, 2, 3]),
'zzz': uuid.uuid4(),
'yyy': 'yyy',
'ddd': b'bbb',
}
self.assertEqual(_dumps_loads(src), src)
def test_itercount(self):
it = itertools.count(1)
six.next(it)
six.next(it)
it2 = _dumps_loads(it)
self.assertEqual(six.next(it), six.next(it2))
it = itertools.count(0)
it2 = _dumps_loads(it)
self.assertEqual(six.next(it), six.next(it2))
@testtools.skipIf(_PY26, 'itertools.count step not supported')
def test_itercount_step(self):
it = itertools.count(1, 3)
it2 = _dumps_loads(it)
self.assertEqual(six.next(it), six.next(it2))
def test_set(self):
self.assertEqual(_dumps_loads(set([1, 2])), set([1, 2]))
def test_empty_set(self):
self.assertEqual(_dumps_loads(set([])), set([]))
def test_frozenset(self):
self.assertEqual(_dumps_loads(frozenset([1, 2])), frozenset([1, 2]))
def test_empty_frozenset(self):
self.assertEqual(_dumps_loads(frozenset([])), frozenset([]))
def test_datetime_preserve(self):
x = datetime.datetime(1920, 2, 3, 4, 5, 6, 7)
self.assertEqual(_dumps_loads(x), x)
def test_DateTime(self):
x = xmlrpclib.DateTime()
x.decode("19710203T04:05:06")
self.assertEqual(_dumps_loads(x), x)
def test_ipaddr(self):
thing = {'ip_addr': netaddr.IPAddress('1.2.3.4')}
self.assertEqual(_dumps_loads(thing), thing)