controller: Support proactive connection
This patch enables to initiate OpenFlow connection from controller side by using "--ofp-switch-address-list" and "--ofp-switch-connect-interval" options. Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
6e69e9b8a6
commit
f775290360
@ -27,11 +27,11 @@ from ryu import cfg
|
||||
import logging
|
||||
from ryu.lib import hub
|
||||
from ryu.lib.hub import StreamServer
|
||||
import traceback
|
||||
import random
|
||||
import ssl
|
||||
from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout
|
||||
import warnings
|
||||
|
||||
import netaddr
|
||||
|
||||
import ryu.base.app_manager
|
||||
|
||||
@ -49,6 +49,7 @@ from ryu.lib.dpid import dpid_to_str
|
||||
LOG = logging.getLogger('ryu.controller.controller')
|
||||
|
||||
DEFAULT_OFP_HOST = '0.0.0.0'
|
||||
DEFAULT_OFP_SW_CON_INTERVAL = 1
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_cli_opts([
|
||||
@ -62,7 +63,14 @@ CONF.register_cli_opts([
|
||||
'(default: %d)' % ofproto_common.OFP_SSL_PORT),
|
||||
cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
|
||||
cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
|
||||
cfg.StrOpt('ca-certs', default=None, help='CA certificates')
|
||||
cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
|
||||
cfg.ListOpt('ofp-switch-address-list', item_type=str, default=[],
|
||||
help='list of IP address and port pairs (default empty). '
|
||||
'e.g., "127.0.0.1:6653,127.0.0.1:6633"'),
|
||||
cfg.IntOpt('ofp-switch-connect-interval',
|
||||
default=DEFAULT_OFP_SW_CON_INTERVAL,
|
||||
help='interval in seconds to connect to switches '
|
||||
'(default %d)' % DEFAULT_OFP_SW_CON_INTERVAL),
|
||||
])
|
||||
CONF.register_opts([
|
||||
cfg.FloatOpt('socket-timeout',
|
||||
@ -78,6 +86,38 @@ CONF.register_opts([
|
||||
])
|
||||
|
||||
|
||||
def _split_addr(addr):
|
||||
"""
|
||||
Splits a str of IP address and port pair into (host, port).
|
||||
|
||||
Example::
|
||||
|
||||
>>> _split_addr('127.0.0.1:6653')
|
||||
('127.0.0.1', 6653)
|
||||
>>> _split_addr('[::1]:6653')
|
||||
('::1', 6653)
|
||||
|
||||
Raises ValueError if invalid format.
|
||||
|
||||
:param addr: A pair of IP address and port.
|
||||
:return: IP address and port
|
||||
"""
|
||||
e = ValueError('Invalid IP address and port pair: "%s"' % addr)
|
||||
pair = addr.rsplit(':', 1)
|
||||
if len(pair) != 2:
|
||||
raise e
|
||||
|
||||
addr, port = pair
|
||||
if addr.startswith('[') and addr.endswith(']'):
|
||||
addr = addr.lstrip('[').rstrip(']')
|
||||
if not netaddr.valid_ipv6(addr):
|
||||
raise e
|
||||
elif not netaddr.valid_ipv4(addr):
|
||||
raise e
|
||||
|
||||
return addr, int(port, 0)
|
||||
|
||||
|
||||
class OpenFlowController(object):
|
||||
def __init__(self):
|
||||
super(OpenFlowController, self).__init__()
|
||||
@ -96,9 +136,18 @@ class OpenFlowController(object):
|
||||
# entry point
|
||||
def __call__(self):
|
||||
# LOG.debug('call')
|
||||
for address in CONF.ofp_switch_address_list:
|
||||
addr = tuple(_split_addr(address))
|
||||
self.spawn_client_loop(addr)
|
||||
|
||||
self.server_loop(self.ofp_tcp_listen_port,
|
||||
self.ofp_ssl_listen_port)
|
||||
|
||||
def spawn_client_loop(self, addr, interval=None):
|
||||
interval = interval or CONF.ofp_switch_connect_interval
|
||||
client = hub.StreamClient(addr)
|
||||
hub.spawn(client.connect_loop, datapath_connection_factory, interval)
|
||||
|
||||
def server_loop(self, ofp_tcp_listen_port, ofp_ssl_listen_port):
|
||||
if CONF.ctl_privkey is not None and CONF.ctl_cert is not None:
|
||||
if CONF.ca_certs is not None:
|
||||
|
@ -140,6 +140,39 @@ if HUB_TYPE == 'eventlet':
|
||||
sock, addr = self.server.accept()
|
||||
spawn(self.handle, sock, addr)
|
||||
|
||||
class StreamClient(object):
|
||||
def __init__(self, addr, timeout=None, **ssl_args):
|
||||
assert netaddr.valid_ipv4(addr[0]) or netaddr.valid_ipv6(addr[0])
|
||||
self.addr = addr
|
||||
self.timeout = timeout
|
||||
self.ssl_args = ssl_args
|
||||
self._is_active = True
|
||||
|
||||
def connect(self):
|
||||
try:
|
||||
if self.timeout is not None:
|
||||
client = socket.create_connection(self.addr,
|
||||
timeout=self.timeout)
|
||||
else:
|
||||
client = socket.create_connection(self.addr)
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
if self.ssl_args:
|
||||
client = ssl.wrap_socket(client, **self.ssl_args)
|
||||
|
||||
return client
|
||||
|
||||
def connect_loop(self, handle, interval):
|
||||
while self._is_active:
|
||||
sock = self.connect()
|
||||
if sock:
|
||||
handle(sock, self.addr)
|
||||
sleep(interval)
|
||||
|
||||
def stop(self):
|
||||
self._is_active = False
|
||||
|
||||
class LoggingWrapper(object):
|
||||
def write(self, message):
|
||||
LOG.info(message.rstrip('\n'))
|
||||
|
Loading…
x
Reference in New Issue
Block a user