When the controller receive loop currently accepts an OpenFlow message, it does not validate the message length. As a result, a malicious or malfunctioning switch could cause send a message that would result in the receive loop making no forward progress. This patch ensures that the message length passed in the OpenFlow message is validated against the specified minimum, and forced to that value if it is smaller. Thanks to Samuel Jero (at Purdue's Dependable and Secure Distributed Systems Lab) for discovering this issue. Signed-off-by: Victor J. Orlikowski <vjo@duke.edu> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
469 lines
19 KiB
Python
469 lines
19 KiB
Python
# Copyright (C) 2011, 2012 Nippon Telegraph and Telephone Corporation.
|
|
# Copyright (C) 2011, 2012 Isaku Yamahata <yamahata at valinux co jp>
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
The main component of OpenFlow controller.
|
|
|
|
- Handle connections from switches
|
|
- Generate and route events to appropriate entities like Ryu applications
|
|
|
|
"""
|
|
|
|
import contextlib
|
|
from ryu import cfg
|
|
import logging
|
|
from ryu.lib import hub
|
|
from ryu.lib.hub import StreamServer
|
|
import traceback
|
|
import random
|
|
import ssl
|
|
from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout
|
|
import warnings
|
|
|
|
import ryu.base.app_manager
|
|
|
|
from ryu.ofproto import ofproto_common
|
|
from ryu.ofproto import ofproto_parser
|
|
from ryu.ofproto import ofproto_protocol
|
|
from ryu.ofproto import ofproto_v1_0
|
|
from ryu.ofproto import nx_match
|
|
|
|
from ryu.controller import ofp_event
|
|
from ryu.controller.handler import HANDSHAKE_DISPATCHER, DEAD_DISPATCHER
|
|
|
|
from ryu.lib.dpid import dpid_to_str
|
|
|
|
LOG = logging.getLogger('ryu.controller.controller')
|
|
|
|
CONF = cfg.CONF
|
|
CONF.register_cli_opts([
|
|
cfg.StrOpt('ofp-listen-host', default='', help='openflow listen host'),
|
|
cfg.IntOpt('ofp-tcp-listen-port', default=None,
|
|
help='openflow tcp listen port '
|
|
'(default: %d)' % ofproto_common.OFP_TCP_PORT),
|
|
cfg.IntOpt('ofp-ssl-listen-port', default=None,
|
|
help='openflow ssl listen port '
|
|
'(default: %d)' % ofproto_common.OFP_SSL_PORT),
|
|
cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
|
|
cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
|
|
cfg.StrOpt('ca-certs', default=None, help='CA certificates')
|
|
])
|
|
CONF.register_opts([
|
|
cfg.FloatOpt('socket-timeout',
|
|
default=5.0,
|
|
help='Time, in seconds, to await completion of socket operations.'),
|
|
cfg.FloatOpt('echo-request-interval',
|
|
default=15.0,
|
|
help='Time, in seconds, between sending echo requests to a datapath.'),
|
|
cfg.IntOpt('maximum-unreplied-echo-requests',
|
|
default=0,
|
|
min=0,
|
|
help='Maximum number of unreplied echo requests before datapath is disconnected.')
|
|
])
|
|
|
|
|
|
class OpenFlowController(object):
|
|
def __init__(self):
|
|
super(OpenFlowController, self).__init__()
|
|
if not CONF.ofp_tcp_listen_port and not CONF.ofp_ssl_listen_port:
|
|
self.ofp_tcp_listen_port = ofproto_common.OFP_TCP_PORT
|
|
self.ofp_ssl_listen_port = ofproto_common.OFP_SSL_PORT
|
|
# For the backward compatibility, we spawn a server loop
|
|
# listening on the old OpenFlow listen port 6633.
|
|
hub.spawn(self.server_loop,
|
|
ofproto_common.OFP_TCP_PORT_OLD,
|
|
ofproto_common.OFP_SSL_PORT_OLD)
|
|
else:
|
|
self.ofp_tcp_listen_port = CONF.ofp_tcp_listen_port
|
|
self.ofp_ssl_listen_port = CONF.ofp_ssl_listen_port
|
|
|
|
# entry point
|
|
def __call__(self):
|
|
# LOG.debug('call')
|
|
self.server_loop(self.ofp_tcp_listen_port,
|
|
self.ofp_ssl_listen_port)
|
|
|
|
def server_loop(self, ofp_tcp_listen_port, ofp_ssl_listen_port):
|
|
if CONF.ctl_privkey is not None and CONF.ctl_cert is not None:
|
|
if CONF.ca_certs is not None:
|
|
server = StreamServer((CONF.ofp_listen_host,
|
|
ofp_ssl_listen_port),
|
|
datapath_connection_factory,
|
|
keyfile=CONF.ctl_privkey,
|
|
certfile=CONF.ctl_cert,
|
|
cert_reqs=ssl.CERT_REQUIRED,
|
|
ca_certs=CONF.ca_certs,
|
|
ssl_version=ssl.PROTOCOL_TLSv1)
|
|
else:
|
|
server = StreamServer((CONF.ofp_listen_host,
|
|
ofp_ssl_listen_port),
|
|
datapath_connection_factory,
|
|
keyfile=CONF.ctl_privkey,
|
|
certfile=CONF.ctl_cert,
|
|
ssl_version=ssl.PROTOCOL_TLSv1)
|
|
else:
|
|
server = StreamServer((CONF.ofp_listen_host,
|
|
ofp_tcp_listen_port),
|
|
datapath_connection_factory)
|
|
|
|
# LOG.debug('loop')
|
|
server.serve_forever()
|
|
|
|
|
|
def _deactivate(method):
|
|
def deactivate(self):
|
|
try:
|
|
method(self)
|
|
finally:
|
|
try:
|
|
self.socket.shutdown(SHUT_RDWR)
|
|
except (EOFError, IOError):
|
|
pass
|
|
|
|
if not self.is_active:
|
|
self.socket.close()
|
|
return deactivate
|
|
|
|
|
|
class Datapath(ofproto_protocol.ProtocolDesc):
|
|
"""
|
|
A class to describe an OpenFlow switch connected to this controller.
|
|
|
|
An instance has the following attributes.
|
|
|
|
.. tabularcolumns:: |l|L|
|
|
|
|
==================================== ======================================
|
|
Attribute Description
|
|
==================================== ======================================
|
|
id 64-bit OpenFlow Datapath ID.
|
|
Only available for
|
|
ryu.controller.handler.MAIN_DISPATCHER
|
|
phase.
|
|
ofproto A module which exports OpenFlow
|
|
definitions, mainly constants appeared
|
|
in the specification, for the
|
|
negotiated OpenFlow version. For
|
|
example, ryu.ofproto.ofproto_v1_0 for
|
|
OpenFlow 1.0.
|
|
ofproto_parser A module which exports OpenFlow wire
|
|
message encoder and decoder for the
|
|
negotiated OpenFlow version.
|
|
For example,
|
|
ryu.ofproto.ofproto_v1_0_parser
|
|
for OpenFlow 1.0.
|
|
ofproto_parser.OFPxxxx(datapath,...) A callable to prepare an OpenFlow
|
|
message for the given switch. It can
|
|
be sent with Datapath.send_msg later.
|
|
xxxx is a name of the message. For
|
|
example OFPFlowMod for flow-mod
|
|
message. Arguemnts depend on the
|
|
message.
|
|
set_xid(self, msg) Generate an OpenFlow XID and put it
|
|
in msg.xid.
|
|
send_msg(self, msg) Queue an OpenFlow message to send to
|
|
the corresponding switch. If msg.xid
|
|
is None, set_xid is automatically
|
|
called on the message before queueing.
|
|
send_packet_out deprecated
|
|
send_flow_mod deprecated
|
|
send_flow_del deprecated
|
|
send_delete_all_flows deprecated
|
|
send_barrier Queue an OpenFlow barrier message to
|
|
send to the switch.
|
|
send_nxt_set_flow_format deprecated
|
|
is_reserved_port deprecated
|
|
==================================== ======================================
|
|
"""
|
|
|
|
def __init__(self, socket, address):
|
|
super(Datapath, self).__init__()
|
|
|
|
self.socket = socket
|
|
self.socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
|
|
self.socket.settimeout(CONF.socket_timeout)
|
|
self.address = address
|
|
self.is_active = True
|
|
|
|
# The limit is arbitrary. We need to limit queue size to
|
|
# prevent it from eating memory up.
|
|
self.send_q = hub.Queue(16)
|
|
self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
|
|
|
|
self.echo_request_interval = CONF.echo_request_interval
|
|
self.max_unreplied_echo_requests = CONF.maximum_unreplied_echo_requests
|
|
self.unreplied_echo_requests = []
|
|
|
|
self.xid = random.randint(0, self.ofproto.MAX_XID)
|
|
self.id = None # datapath_id is unknown yet
|
|
self._ports = None
|
|
self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
|
|
self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
|
|
self.set_state(HANDSHAKE_DISPATCHER)
|
|
|
|
@_deactivate
|
|
def close(self):
|
|
if self.state != DEAD_DISPATCHER:
|
|
self.set_state(DEAD_DISPATCHER)
|
|
|
|
def set_state(self, state):
|
|
self.state = state
|
|
ev = ofp_event.EventOFPStateChange(self)
|
|
ev.state = state
|
|
self.ofp_brick.send_event_to_observers(ev, state)
|
|
|
|
# Low level socket handling layer
|
|
@_deactivate
|
|
def _recv_loop(self):
|
|
buf = bytearray()
|
|
count = 0
|
|
min_read_len = remaining_read_len = ofproto_common.OFP_HEADER_SIZE
|
|
|
|
while self.state != DEAD_DISPATCHER:
|
|
try:
|
|
read_len = min_read_len
|
|
if (remaining_read_len > min_read_len):
|
|
read_len = remaining_read_len
|
|
ret = self.socket.recv(read_len)
|
|
except SocketTimeout:
|
|
continue
|
|
except ssl.SSLError:
|
|
# eventlet throws SSLError (which is a subclass of IOError)
|
|
# on SSL socket read timeout; re-try the loop in this case.
|
|
continue
|
|
except (EOFError, IOError):
|
|
break
|
|
|
|
if len(ret) == 0:
|
|
break
|
|
|
|
buf += ret
|
|
buf_len = len(buf)
|
|
while buf_len >= min_read_len:
|
|
(version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
|
|
if (msg_len < min_read_len):
|
|
# Someone isn't playing nicely; log it, and try something sane.
|
|
LOG.debug("Message with invalid length %s received from switch at address %s",
|
|
msg_len, self.address)
|
|
msg_len = min_read_len
|
|
if buf_len < msg_len:
|
|
remaining_read_len = (msg_len - buf_len)
|
|
break
|
|
|
|
msg = ofproto_parser.msg(
|
|
self, version, msg_type, msg_len, xid, buf[:msg_len])
|
|
# LOG.debug('queue msg %s cls %s', msg, msg.__class__)
|
|
if msg:
|
|
ev = ofp_event.ofp_msg_to_ev(msg)
|
|
self.ofp_brick.send_event_to_observers(ev, self.state)
|
|
|
|
dispatchers = lambda x: x.callers[ev.__class__].dispatchers
|
|
handlers = [handler for handler in
|
|
self.ofp_brick.get_handlers(ev) if
|
|
self.state in dispatchers(handler)]
|
|
for handler in handlers:
|
|
handler(ev)
|
|
|
|
buf = buf[msg_len:]
|
|
buf_len = len(buf)
|
|
remaining_read_len = min_read_len
|
|
|
|
# We need to schedule other greenlets. Otherwise, ryu
|
|
# can't accept new switches or handle the existing
|
|
# switches. The limit is arbitrary. We need the better
|
|
# approach in the future.
|
|
count += 1
|
|
if count > 2048:
|
|
count = 0
|
|
hub.sleep(0)
|
|
|
|
def _send_loop(self):
|
|
try:
|
|
while self.state != DEAD_DISPATCHER:
|
|
buf = self.send_q.get()
|
|
self._send_q_sem.release()
|
|
self.socket.sendall(buf)
|
|
except SocketTimeout:
|
|
LOG.debug("Socket timed out while sending data to switch at address %s",
|
|
self.address)
|
|
except IOError as ioe:
|
|
# Convert ioe.errno to a string, just in case it was somehow set to None.
|
|
errno = "%s" % ioe.errno
|
|
LOG.debug("Socket error while sending data to switch at address %s: [%s] %s",
|
|
self.address, errno, ioe.strerror)
|
|
finally:
|
|
q = self.send_q
|
|
# First, clear self.send_q to prevent new references.
|
|
self.send_q = None
|
|
# Now, drain the send_q, releasing the associated semaphore for each entry.
|
|
# This should release all threads waiting to acquire the semaphore.
|
|
try:
|
|
while q.get(block=False):
|
|
self._send_q_sem.release()
|
|
except hub.QueueEmpty:
|
|
pass
|
|
# Finally, ensure the _recv_loop terminates.
|
|
self.close()
|
|
|
|
def send(self, buf):
|
|
msg_enqueued = False
|
|
self._send_q_sem.acquire()
|
|
if self.send_q:
|
|
self.send_q.put(buf)
|
|
msg_enqueued = True
|
|
else:
|
|
self._send_q_sem.release()
|
|
if not msg_enqueued:
|
|
LOG.debug('Datapath in process of terminating; send() to %s discarded.',
|
|
self.address)
|
|
return msg_enqueued
|
|
|
|
def set_xid(self, msg):
|
|
self.xid += 1
|
|
self.xid &= self.ofproto.MAX_XID
|
|
msg.set_xid(self.xid)
|
|
return self.xid
|
|
|
|
def send_msg(self, msg):
|
|
assert isinstance(msg, self.ofproto_parser.MsgBase)
|
|
if msg.xid is None:
|
|
self.set_xid(msg)
|
|
msg.serialize()
|
|
# LOG.debug('send_msg %s', msg)
|
|
return self.send(msg.buf)
|
|
|
|
def _echo_request_loop(self):
|
|
if not self.max_unreplied_echo_requests:
|
|
return
|
|
while (self.send_q and
|
|
(len(self.unreplied_echo_requests) <= self.max_unreplied_echo_requests)):
|
|
echo_req = self.ofproto_parser.OFPEchoRequest(self)
|
|
self.unreplied_echo_requests.append(self.set_xid(echo_req))
|
|
self.send_msg(echo_req)
|
|
hub.sleep(self.echo_request_interval)
|
|
self.close()
|
|
|
|
def acknowledge_echo_reply(self, xid):
|
|
try:
|
|
self.unreplied_echo_requests.remove(xid)
|
|
except:
|
|
pass
|
|
|
|
def serve(self):
|
|
send_thr = hub.spawn(self._send_loop)
|
|
|
|
# send hello message immediately
|
|
hello = self.ofproto_parser.OFPHello(self)
|
|
self.send_msg(hello)
|
|
|
|
echo_thr = hub.spawn(self._echo_request_loop)
|
|
|
|
try:
|
|
self._recv_loop()
|
|
finally:
|
|
hub.kill(send_thr)
|
|
hub.kill(echo_thr)
|
|
hub.joinall([send_thr, echo_thr])
|
|
self.is_active = False
|
|
|
|
#
|
|
# Utility methods for convenience
|
|
#
|
|
def send_packet_out(self, buffer_id=0xffffffff, in_port=None,
|
|
actions=None, data=None):
|
|
if in_port is None:
|
|
in_port = self.ofproto.OFPP_NONE
|
|
packet_out = self.ofproto_parser.OFPPacketOut(
|
|
self, buffer_id, in_port, actions, data)
|
|
self.send_msg(packet_out)
|
|
|
|
def send_flow_mod(self, rule, cookie, command, idle_timeout, hard_timeout,
|
|
priority=None, buffer_id=0xffffffff,
|
|
out_port=None, flags=0, actions=None):
|
|
if priority is None:
|
|
priority = self.ofproto.OFP_DEFAULT_PRIORITY
|
|
if out_port is None:
|
|
out_port = self.ofproto.OFPP_NONE
|
|
flow_format = rule.flow_format()
|
|
assert (flow_format == ofproto_v1_0.NXFF_OPENFLOW10 or
|
|
flow_format == ofproto_v1_0.NXFF_NXM)
|
|
if self.flow_format < flow_format:
|
|
self.send_nxt_set_flow_format(flow_format)
|
|
if flow_format == ofproto_v1_0.NXFF_OPENFLOW10:
|
|
match_tuple = rule.match_tuple()
|
|
match = self.ofproto_parser.OFPMatch(*match_tuple)
|
|
flow_mod = self.ofproto_parser.OFPFlowMod(
|
|
self, match, cookie, command, idle_timeout, hard_timeout,
|
|
priority, buffer_id, out_port, flags, actions)
|
|
else:
|
|
flow_mod = self.ofproto_parser.NXTFlowMod(
|
|
self, cookie, command, idle_timeout, hard_timeout,
|
|
priority, buffer_id, out_port, flags, rule, actions)
|
|
self.send_msg(flow_mod)
|
|
|
|
def send_flow_del(self, rule, cookie, out_port=None):
|
|
self.send_flow_mod(rule=rule, cookie=cookie,
|
|
command=self.ofproto.OFPFC_DELETE,
|
|
idle_timeout=0, hard_timeout=0, priority=0,
|
|
out_port=out_port)
|
|
|
|
def send_delete_all_flows(self):
|
|
rule = nx_match.ClsRule()
|
|
self.send_flow_mod(
|
|
rule=rule, cookie=0, command=self.ofproto.OFPFC_DELETE,
|
|
idle_timeout=0, hard_timeout=0, priority=0, buffer_id=0,
|
|
out_port=self.ofproto.OFPP_NONE, flags=0, actions=None)
|
|
|
|
def send_barrier(self):
|
|
barrier_request = self.ofproto_parser.OFPBarrierRequest(self)
|
|
return self.send_msg(barrier_request)
|
|
|
|
def send_nxt_set_flow_format(self, flow_format):
|
|
assert (flow_format == ofproto_v1_0.NXFF_OPENFLOW10 or
|
|
flow_format == ofproto_v1_0.NXFF_NXM)
|
|
if self.flow_format == flow_format:
|
|
# Nothing to do
|
|
return
|
|
self.flow_format = flow_format
|
|
set_format = self.ofproto_parser.NXTSetFlowFormat(self, flow_format)
|
|
# FIXME: If NXT_SET_FLOW_FORMAT or NXFF_NXM is not supported by
|
|
# the switch then an error message will be received. It may be
|
|
# handled by setting self.flow_format to
|
|
# ofproto_v1_0.NXFF_OPENFLOW10 but currently isn't.
|
|
self.send_msg(set_format)
|
|
self.send_barrier()
|
|
|
|
def is_reserved_port(self, port_no):
|
|
return port_no > self.ofproto.OFPP_MAX
|
|
|
|
|
|
def datapath_connection_factory(socket, address):
|
|
LOG.debug('connected socket:%s address:%s', socket, address)
|
|
with contextlib.closing(Datapath(socket, address)) as datapath:
|
|
try:
|
|
datapath.serve()
|
|
except:
|
|
# Something went wrong.
|
|
# Especially malicious switch can send malformed packet,
|
|
# the parser raise exception.
|
|
# Can we do anything more graceful?
|
|
if datapath.id is None:
|
|
dpid_str = "%s" % datapath.id
|
|
else:
|
|
dpid_str = dpid_to_str(datapath.id)
|
|
LOG.error("Error in the datapath %s from %s", dpid_str, address)
|
|
raise
|