BGPSpeaker/net_ctrl: Support multiple RPC sessions

Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
IWASE Yusuke
2016-12-16 15:17:43 +09:00
committed by FUJITA Tomonori
parent 8555dda0f2
commit 42c71ad0e6

View File

@@ -94,19 +94,21 @@ class RpcSession(Activity):
and utilities that use these. It also cares about socket communication w/
RPC peer.
"""
NAME_FMT = 'RpcSession%s'
def __init__(self, sock, outgoing_msg_sink_iter):
super(RpcSession, self).__init__("RpcSession(%s)" % sock)
self.peer_name = str(sock.getpeername())
super(RpcSession, self).__init__(self.NAME_FMT % self.peer_name)
self._packer = msgpack.Packer(encoding='utf-8')
self._unpacker = msgpack.Unpacker(encoding='utf-8')
self._next_msgid = 0
self._socket = sock
self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
self.peer_name = str(self._socket.getpeername())
self.is_connected = True
def stop(self):
super(RpcSession, self).stop()
self.is_connected = False
LOG.info('RPC Session to %s stopped', self.peer_name)
def _run(self):
@@ -330,7 +332,8 @@ class _NetworkController(FlexinetPeer, Activity):
# Outstanding requests, i.e. requests for which we are yet to receive
# response from peer. We currently do not have any requests going out.
self._outstanding_reqs = {}
self._rpc_session = None
# Dictionary for Peer name to RPC session.
self._rpc_sessions = {}
def _run(self, *args, **kwargs):
"""Runs RPC server.
@@ -352,19 +355,28 @@ class _NetworkController(FlexinetPeer, Activity):
def _start_rpc_session(self, sock):
"""Starts a new RPC session with given connection.
"""
if self._rpc_session and self._rpc_session.started:
self._rpc_session.stop()
session_name = RpcSession.NAME_FMT % str(sock.getpeername())
self._stop_child_activities(session_name)
self._rpc_session = RpcSession(sock, self)
self._rpc_session.start()
rpc_session = RpcSession(sock, self)
self._spawn_activity(rpc_session)
def _send_rpc_notification_to_session(self, session, method, params):
if not session.is_connected:
# Stops disconnected RPC session.
self._stop_child_activities(session.name)
return
return session.send_notification(method, params)
def send_rpc_notification(self, method, params):
if not self.started or self._rpc_session is None:
if not self.started:
return
elif not self._rpc_session.is_connected:
self._rpc_session = None
elif self._rpc_session.started:
return self._rpc_session.send_notification(method, params)
for session in list(self._child_activity_map.values()):
if not isinstance(session, RpcSession):
continue
self._send_rpc_notification_to_session(session, method, params)
def _handle_response(response):