RPC: Specify encoding to msgpack.Packer/Unpacker

Currently, RPC requests using rpc_cli.py will crash on Python 3,
because the decoded string through msgpack-rpc is not str type
when the default encoding is not specified into msgpack.Unpacker.
On Python 2, bytes type is the same as str type, and this problem
does not occur.

The old spec of msgpack had no notation of the encoding, but now,
msgpack defines "UTF-8" as the default encoding and has the explicit
type definitions for String and Binary.
  https://github.com/msgpack/msgpack/blob/master/spec.md

This patch fixes to specify the encoding to msgpack.Packer/Unpacker
and enable to use Binary type when packing for the Python 3
compatibility.

Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
IWASE Yusuke 2016-11-09 14:17:17 +09:00 committed by FUJITA Tomonori
parent a45c180447
commit 1af384fa17
3 changed files with 77 additions and 99 deletions

View File

@ -14,8 +14,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# msgpack-rpc # Specification:
# http://wiki.msgpack.org/display/MSGPACK/RPC+specification # - msgpack
# https://github.com/msgpack/msgpack/blob/master/spec.md
# - msgpack-rpc
# https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md
from collections import deque from collections import deque
import select import select
@ -36,13 +39,8 @@ class MessageEncoder(object):
""" """
def __init__(self): def __init__(self):
super(MessageEncoder, self).__init__() super(MessageEncoder, self).__init__()
# note: on-wire msgpack has no notion of encoding. self._packer = msgpack.Packer(encoding='utf-8', use_bin_type=True)
# the msgpack-python library implicitly converts unicode to self._unpacker = msgpack.Unpacker(encoding='utf-8')
# utf-8 encoded bytes by default. we don't want to rely on
# the behaviour though because it seems to be going to change.
# cf. https://gist.github.com/methane/5022403
self._packer = msgpack.Packer(encoding=None)
self._unpacker = msgpack.Unpacker(encoding=None)
self._next_msgid = 0 self._next_msgid = 0
def _create_msgid(self): def _create_msgid(self):
@ -51,7 +49,7 @@ class MessageEncoder(object):
return this_id return this_id
def create_request(self, method, params): def create_request(self, method, params):
assert isinstance(method, six.binary_type) assert isinstance(method, (str, six.binary_type))
assert isinstance(params, list) assert isinstance(params, list)
msgid = self._create_msgid() msgid = self._create_msgid()
return (self._packer.pack( return (self._packer.pack(
@ -64,7 +62,7 @@ class MessageEncoder(object):
return self._packer.pack([MessageType.RESPONSE, msgid, error, result]) return self._packer.pack([MessageType.RESPONSE, msgid, error, result])
def create_notification(self, method, params): def create_notification(self, method, params):
assert isinstance(method, six.binary_type) assert isinstance(method, (str, six.binary_type))
assert isinstance(params, list) assert isinstance(params, list)
return self._packer.pack([MessageType.NOTIFY, method, params]) return self._packer.pack([MessageType.NOTIFY, method, params])

View File

@ -23,6 +23,8 @@ import logging
import socket import socket
import traceback import traceback
import msgpack
from ryu.services.protocols.bgp import api from ryu.services.protocols.bgp import api
from ryu.services.protocols.bgp.api.base import ApiException from ryu.services.protocols.bgp.api.base import ApiException
from ryu.services.protocols.bgp.api.base import NEXT_HOP from ryu.services.protocols.bgp.api.base import NEXT_HOP
@ -95,10 +97,8 @@ class RpcSession(Activity):
def __init__(self, socket, outgoing_msg_sink_iter): def __init__(self, socket, outgoing_msg_sink_iter):
super(RpcSession, self).__init__("RpcSession(%s)" % socket) super(RpcSession, self).__init__("RpcSession(%s)" % socket)
import msgpack self._packer = msgpack.Packer(encoding='utf-8')
self._unpacker = msgpack.Unpacker(encoding='utf-8')
self._packer = msgpack.Packer()
self._unpacker = msgpack.Unpacker()
self._next_msgid = 0 self._next_msgid = 0
self._socket = socket self._socket = socket
self._outgoing_msg_sink_iter = outgoing_msg_sink_iter self._outgoing_msg_sink_iter = outgoing_msg_sink_iter

View File

@ -15,17 +15,14 @@
# limitations under the License. # limitations under the License.
import numbers import numbers
import time import socket
import sys import struct
if sys.version_info < (2, 7): import unittest
import unittest2 as unittest
else:
import unittest
from nose.tools import raises from nose.tools import raises
import six import six
from ryu.lib import hub from ryu.lib import hub
hub.patch()
from ryu.lib import rpc from ryu.lib import rpc
@ -40,22 +37,19 @@ class Test_rpc(unittest.TestCase):
def _handle_request(self, m): def _handle_request(self, m):
e = self._server_endpoint e = self._server_endpoint
msgid, method, params = m msgid, method, params = m
if method == b'resp': if method == 'resp':
e.send_response(msgid, result=params[0]) e.send_response(msgid, result=params[0])
elif method == b'err': elif method == 'err':
e.send_response(msgid, error=params[0]) e.send_response(msgid, error=params[0])
elif method == b'callback': elif method == 'callback':
n, cb, v = params n, cb, v = params
assert n > 0 assert n > 0
self._requests.add(e.send_request(cb, [msgid, n, cb, v])) self._requests.add(e.send_request(cb, [msgid, n, cb, v]))
elif method == b'notify1': elif method == 'notify1':
e.send_notification(params[1], params[2]) e.send_notification(params[1], params[2])
e.send_response(msgid, result=params[0]) e.send_response(msgid, result=params[0])
elif method == b'shutdown': elif method == 'shutdown':
import socket how = getattr(socket, params[0])
# Though six.text_type is not needed in python2, it is
# unconditionally applied for code simplicityp
how = getattr(socket, six.text_type(params[0], 'utf-8'))
self._server_sock.shutdown(how) self._server_sock.shutdown(how)
e.send_response(msgid, result=method) e.send_response(msgid, result=method)
else: else:
@ -64,7 +58,7 @@ class Test_rpc(unittest.TestCase):
def _handle_notification(self, m): def _handle_notification(self, m):
e = self._server_endpoint e = self._server_endpoint
method, params = m method, params = m
if method == b'notify2': if method == 'notify2':
e.send_notification(params[0], params[1]) e.send_notification(params[0], params[1])
def _handle_response(self, m): def _handle_response(self, m):
@ -80,8 +74,6 @@ class Test_rpc(unittest.TestCase):
self._requests.add(e.send_request(cb, [omsgid, n, cb, v])) self._requests.add(e.send_request(cb, [omsgid, n, cb, v]))
def setUp(self): def setUp(self):
import socket
self._server_sock, self._client_sock = socket.socketpair() self._server_sock, self._client_sock = socket.socketpair()
table = { table = {
rpc.MessageType.REQUEST: self._handle_request, rpc.MessageType.REQUEST: self._handle_request,
@ -100,24 +92,24 @@ class Test_rpc(unittest.TestCase):
def test_0_call_str(self): def test_0_call_str(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = b'hoge' obj = 'hoge'
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, bytes) assert isinstance(result, str)
def test_0_call_int(self): def test_0_call_int(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = 12345 obj = 12345
assert isinstance(obj, int) assert isinstance(obj, int)
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, type(obj)) assert isinstance(result, numbers.Integral)
def test_0_call_int2(self): def test_0_call_int2(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = six.MAXSIZE obj = six.MAXSIZE
assert isinstance(obj, int) assert isinstance(obj, int)
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, numbers.Integral) assert isinstance(result, numbers.Integral)
@ -125,7 +117,7 @@ class Test_rpc(unittest.TestCase):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = - six.MAXSIZE - 1 obj = - six.MAXSIZE - 1
assert isinstance(obj, int) assert isinstance(obj, int)
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, numbers.Integral) assert isinstance(result, numbers.Integral)
@ -133,120 +125,108 @@ class Test_rpc(unittest.TestCase):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = 0xffffffffffffffff # max value for msgpack obj = 0xffffffffffffffff # max value for msgpack
assert isinstance(obj, numbers.Integral) assert isinstance(obj, numbers.Integral)
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, numbers.Integral) assert isinstance(result, numbers.Integral)
def test_0_call_long2(self): def test_0_call_long2(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
# NOTE: the python type of this value is int for 64-bit arch # Note: the python type of this value is int for 64-bit arch
obj = -0x8000000000000000 # min value for msgpack obj = -0x8000000000000000 # min value for msgpack
assert isinstance(obj, numbers.Integral) assert isinstance(obj, numbers.Integral)
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, type(obj)) assert isinstance(result, numbers.Integral)
@raises(TypeError) @raises(TypeError)
def test_0_call_bytearray(self): def test_0_call_bytearray(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = bytearray(b'foo') obj = bytearray(b'foo')
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, bytes) assert isinstance(result, str)
def test_1_shutdown_wr(self): def test_1_shutdown_wr(self):
# test if the server shutdown on disconnect # test if the server shutdown on disconnect
import socket
self._client_sock.shutdown(socket.SHUT_WR) self._client_sock.shutdown(socket.SHUT_WR)
hub.joinall([self._server_thread]) hub.joinall([self._server_thread])
@raises(EOFError) @raises(EOFError)
def test_1_client_shutdown_wr(self): def test_1_client_shutdown_wr(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
c.call(b'shutdown', [b'SHUT_WR']) c.call('shutdown', ['SHUT_WR'])
def test_1_call_True(self): def test_1_call_True(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = True obj = True
assert c.call(b'resp', [obj]) == obj assert c.call('resp', [obj]) == obj
def test_2_call_None(self): def test_2_call_None(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = None obj = None
assert c.call(b'resp', [obj]) is None assert c.call('resp', [obj]) is None
def test_2_call_False(self): def test_2_call_False(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = False obj = False
assert c.call(b'resp', [obj]) == obj assert c.call('resp', [obj]) == obj
def test_2_call_dict(self): def test_2_call_dict(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = {b'hoge': 1, b'fuga': 2} obj = {'hoge': 1, 'fuga': 2}
assert c.call(b'resp', [obj]) == obj assert c.call('resp', [obj]) == obj
def test_2_call_empty_dict(self): def test_2_call_empty_dict(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = {} obj = {}
assert c.call(b'resp', [obj]) == obj assert c.call('resp', [obj]) == obj
def test_2_call_array(self): def test_2_call_array(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = [1, 2, 3, 4] obj = [1, 2, 3, 4]
assert c.call(b'resp', [obj]) == obj assert c.call('resp', [obj]) == obj
def test_2_call_empty_array(self): def test_2_call_empty_array(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = [] obj = []
assert c.call(b'resp', [obj]) == obj assert c.call('resp', [obj]) == obj
def test_2_call_tuple(self): def test_2_call_tuple(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
# note: msgpack library implicitly convert a tuple into a list # Note: msgpack library implicitly convert a tuple into a list
obj = (1, 2, 3) obj = (1, 2, 3)
assert c.call(b'resp', [obj]) == list(obj) assert c.call('resp', [obj]) == list(obj)
@raises(TypeError)
def test_2_call_unicode(self): def test_2_call_unicode(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
# note: on-wire msgpack has no notion of encoding. # Note: We use encoding='utf-8' option in msgpack.Packer/Unpacker
# the msgpack library implicitly converts unicode to # in order to support Python 3.
# utf-8 encoded bytes by default. # With this option, utf-8 encoded bytes will be decoded into unicode
# we don't want to rely on the behaviour though because # type in Python 2 and str type in Python 3.
# it seems to be going to change.
# https://gist.github.com/methane/5022403
obj = u"hoge" obj = u"hoge"
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, bytes) assert isinstance(result, six.text_type)
def test_2_call_small_binary(self): def test_2_call_small_binary(self):
import struct
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = struct.pack("100x") obj = struct.pack("100x")
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, bytes) assert isinstance(result, six.binary_type)
def test_3_call_complex(self): def test_3_call_complex(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = [1, b'hoge', {b'foo': 1, 3: b'bar'}] obj = [1, 'hoge', {'foo': 1, 3: 'bar'}]
assert c.call(b'resp', [obj]) == list(obj) assert c.call('resp', [obj]) == obj
@unittest.skip("doesn't work with eventlet 0.18 and later") @unittest.skip("doesn't work with eventlet 0.18 and later")
def test_4_call_large_binary(self): def test_4_call_large_binary(self):
import struct
import sys
# note: on PyPy, this test case may hang up.
sv = getattr(sys, 'subversion', None)
if sv is not None and sv[0] == 'PyPy':
return
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = struct.pack("10000000x") obj = struct.pack("10000000x")
result = c.call(b'resp', [obj]) result = c.call('resp', [obj])
assert result == obj assert result == obj
assert isinstance(result, bytes) assert isinstance(result, six.binary_type)
def test_0_notification1(self): def test_0_notification1(self):
l = [] l = []
@ -254,15 +234,15 @@ class Test_rpc(unittest.TestCase):
def callback(n): def callback(n):
l.append(n) l.append(n)
c = rpc.Client(self._client_sock, notification_callback=callback) c = rpc.Client(self._client_sock, notification_callback=callback)
obj = b'hogehoge' obj = 'hogehoge'
robj = b'fugafuga' robj = 'fugafuga'
assert c.call(b'notify1', [robj, b'notify_hoge', [obj]]) == robj assert c.call('notify1', [robj, 'notify_hoge', [obj]]) == robj
c.receive_notification() c.receive_notification()
assert len(l) == 1 assert len(l) == 1
n = l.pop(0) n = l.pop(0)
assert n is not None assert n is not None
method, params = n method, params = n
assert method == b'notify_hoge' assert method == 'notify_hoge'
assert params[0] == obj assert params[0] == obj
def test_0_notification2(self): def test_0_notification2(self):
@ -271,21 +251,21 @@ class Test_rpc(unittest.TestCase):
def callback(n): def callback(n):
l.append(n) l.append(n)
c = rpc.Client(self._client_sock, notification_callback=callback) c = rpc.Client(self._client_sock, notification_callback=callback)
obj = b'hogehogehoge' obj = 'hogehogehoge'
c.send_notification(b'notify2', [b'notify_hoge', [obj]]) c.send_notification('notify2', ['notify_hoge', [obj]])
c.receive_notification() c.receive_notification()
assert len(l) == 1 assert len(l) == 1
n = l.pop(0) n = l.pop(0)
assert n is not None assert n is not None
method, params = n method, params = n
assert method == b'notify_hoge' assert method == 'notify_hoge'
assert params[0] == obj assert params[0] == obj
def test_0_call_error(self): def test_0_call_error(self):
c = rpc.Client(self._client_sock) c = rpc.Client(self._client_sock)
obj = b'hoge' obj = 'hoge'
try: try:
c.call(b'err', [obj]) c.call('err', [obj])
raise Exception("unexpected") raise Exception("unexpected")
except rpc.RPCError as e: except rpc.RPCError as e:
assert e.get_value() == obj assert e.get_value() == obj
@ -296,18 +276,18 @@ class Test_rpc(unittest.TestCase):
def callback(n): def callback(n):
l.append(n) l.append(n)
c = rpc.Client(self._client_sock, notification_callback=callback) c = rpc.Client(self._client_sock, notification_callback=callback)
c.send_notification(b'notify2', [b'notify_foo', []]) c.send_notification('notify2', ['notify_foo', []])
hub.sleep(0.5) # give the peer a chance to run hub.sleep(0.5) # give the peer a chance to run
obj = b'hoge' obj = 'hoge'
try: try:
c.call(b'err', [obj]) c.call('err', [obj])
raise Exception("unexpected") raise Exception("unexpected")
except rpc.RPCError as e: except rpc.RPCError as e:
assert e.get_value() == obj assert e.get_value() == obj
assert len(l) == 1 assert len(l) == 1
n = l.pop(0) n = l.pop(0)
method, params = n method, params = n
assert method == b'notify_foo' assert method == 'notify_foo'
assert params == [] assert params == []
def test_4_async_call(self): def test_4_async_call(self):
@ -319,7 +299,7 @@ class Test_rpc(unittest.TestCase):
e = rpc.EndPoint(self._client_sock) e = rpc.EndPoint(self._client_sock)
s = set() s = set()
for i in range(1, num_calls + 1): for i in range(1, num_calls + 1):
s.add(e.send_request(b'resp', [i])) s.add(e.send_request('resp', [i]))
sum = 0 sum = 0
while s: while s:
e.block() e.block()
@ -349,7 +329,7 @@ class Test_rpc(unittest.TestCase):
e = rpc.EndPoint(self._client_sock) e = rpc.EndPoint(self._client_sock)
s = set() s = set()
for i in range(1, num_calls + 1): for i in range(1, num_calls + 1):
s.add(e.send_request(b'callback', [i, b'ourcallback', 0])) s.add(e.send_request('callback', [i, 'ourcallback', 0]))
sum = 0 sum = 0
while s: while s:
e.block() e.block()
@ -368,10 +348,10 @@ class Test_rpc(unittest.TestCase):
r = e.get_request() r = e.get_request()
if r is not None: if r is not None:
msgid, method, params = r msgid, method, params = r
assert method == b'ourcallback' assert method == 'ourcallback'
omsgid, n, cb, v = params omsgid, n, cb, v = params
assert omsgid in s assert omsgid in s
assert cb == b'ourcallback' assert cb == 'ourcallback'
assert n > 0 assert n > 0
e.send_response(msgid, result=[omsgid, n - 1, cb, v + 1]) e.send_response(msgid, result=[omsgid, n - 1, cb, v + 1])
assert sum == (1 + num_calls) * num_calls / 2 assert sum == (1 + num_calls) * num_calls / 2