deb-ryu/ryu/app/quantum_adapter.py
Isaku Yamahata 66b0c290be app/quantum_adapter: Lazily initialize neutron client
When ryu starts up, related openstack components (keystone and neutron)
might not be running. They might be during start up process.
In that case, quantum_adapter results in exception as follows.
So in order to avoid ordering of starting up, lazily initialize neutoron
api.

> hub: uncaught exception: Traceback (most recent call last):
>   File "/ryu/lib/hub.py", line 48, in _launch
>     func(*args, **kwargs)
>   File "/ryu/base/app_manager.py", line 173, in _event_loop
>     handler(ev)
>   File "/ryu/app/quantum_adapter.py", line 398, in dp_handler
>     ovs_switch = self._get_ovs_switch(dpid)
>   File "/ryu/app/quantum_adapter.py", line 381, in _get_ovs_switch
>     ovs_switch = OVSSwitch(dpid, self.nw, self.ifaces, self.logger)
>   File "/ryu/app/quantum_adapter.py", line 167, in __init__
>     token = _get_auth_token(logger)
>   File "/ryu/app/quantum_adapter.py", line 90, in _get_auth_token
>     httpclient.authenticate()
>   File "/neutronclient/client.py", line 211, in authenticate
>     content_type="application/json")
>   File "/neutronclient/client.py", line 141, in _cs_request
>     raise exceptions.ConnectionFailed(reason=e)
> ConnectionFailed: Connection to neutron failed: [Errno 111] ECONNREFUSED

Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-07-17 14:15:05 +09:00

443 lines
17 KiB
Python

# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import traceback
from oslo.config import cfg
try:
from neutronclient import client as q_client
from neutronclient.common import exceptions as q_exc
from neutronclient.common.exceptions import (NeutronClientException as
client_exc)
from neutronclient.v2_0 import client as q_clientv2
except ImportError:
from quantumclient import client as q_client
from quantumclient.common import exceptions as q_exc
from quantumclient.common.exceptions import (QuantumClientException as
client_exc)
from quantumclient.v2_0 import client as q_clientv2
from ryu.app import conf_switch_key as cs_key
from ryu.app import rest_nw_id
from ryu.base import app_manager
from ryu.controller import (conf_switch,
dpset,
handler,
network)
from ryu import exception as ryu_exc
from ryu.lib import dpid as dpid_lib
from ryu.lib import mac as mac_lib
from ryu.lib import quantum_ifaces
from ryu.lib.ovs import bridge
from ryu.lib.quantum_ifaces import QuantumIfaces
CONF = cfg.CONF
def _get_auth_token(logger):
httpclient = q_client.HTTPClient(
username=CONF.neutron_admin_username,
tenant_name=CONF.neutron_admin_tenant_name,
password=CONF.neutron_admin_password,
auth_url=CONF.neutron_admin_auth_url,
timeout=CONF.neutron_url_timeout,
auth_strategy=CONF.neutron_auth_strategy)
try:
httpclient.authenticate()
except (q_exc.Unauthorized, q_exc.Forbidden, q_exc.EndpointNotFound) as e:
logger.error("authentication failure: %s", e)
return None
# logger.debug("_get_auth_token: token=%s", httpclient.auth_token)
return httpclient.auth_token
def _get_quantum_client(token):
if token:
my_client = q_clientv2.Client(
endpoint_url=CONF.neutron_url,
token=token, timeout=CONF.neutron_url_timeout)
else:
my_client = q_clientv2.Client(
endpoint_url=CONF.neutron_url,
auth_strategy=None, timeout=CONF.neutron_url_timeout)
return my_client
class OVSPort(object):
PORT_ERROR = -1
PORT_UNKNOWN = 0
PORT_GATEWAY = 1
PORT_VETH_GATEWAY = 2
PORT_GUEST = 3
PORT_TUNNEL = 4
# extra-ids: 'attached-mac', 'iface-id', 'iface-status', 'vm-uuid'
def __init__(self, ofport, port_name):
super(OVSPort, self).__init__()
self.ofport = ofport
self.name = port_name
self.type = None
self.ext_ids = {}
self.options = {}
def update(self, port):
self.__dict__.update((key, port[key]) for key
in ['name', 'ofport', 'type']
if key in port)
if 'external_ids' in port:
self.ext_ids = dict(port['external_ids'])
if 'options' in port:
self.options = dict(port['options'])
def get_port_type(self):
if not isinstance(self.ofport, int):
return self.PORT_ERROR
if self.type == 'internal' and 'iface-id' in self.ext_ids:
return self.PORT_GATEWAY
if self.type == '' and 'iface-id' in self.ext_ids:
return self.PORT_VETH_GATEWAY
if (self.type == 'gre' and 'local_ip' in self.options and
'remote_ip' in self.options):
return self.PORT_TUNNEL
if self.type == '' and 'vm-uuid' in self.ext_ids:
return self.PORT_GUEST
return self.PORT_UNKNOWN
def __str__(self):
return "type=%s ofport=%s name=%s, ext_ids=%s options=%s" % (
self.type, self.ofport, self.name, self.ext_ids, self.options)
def __eq__(self, other):
return (other is not None and
self.ofport == other.ofport and
self.type == other.type and
self.ext_ids == other.ext_ids and
self.options == other.options)
class OVSSwitch(object):
def __init__(self, dpid, nw, ifaces, logger):
# TODO: clean up
self.dpid = dpid
self.network_api = nw
self.ifaces = ifaces
self.logger = logger
self._q_api = None # lazy initialization
self.ctrl_addr = CONF.neutron_controller_addr
if not self.ctrl_addr:
raise ValueError('option neutron_controler_addr must be speicfied')
self.ovsdb_addr = None
self.tunnel_ip = None
self.ovs_bridge = None
self.ports = {} # port_no -> OVSPort
super(OVSSwitch, self).__init__()
@property
def q_api(self):
if self._q_api is None:
token = None
if CONF.neutron_auth_strategy:
token = _get_auth_token(self.logger)
self._q_api = _get_quantum_client(token)
return self._q_api
def set_ovsdb_addr(self, dpid, ovsdb_addr):
# easy check if the address format valid
self.logger.debug('set_ovsdb_addr dpid %s ovsdb_addr %s',
dpid_lib.dpid_to_str(dpid), ovsdb_addr)
_proto, _host, _port = ovsdb_addr.split(':')
old_address = self.ovsdb_addr
if old_address == ovsdb_addr:
return
if ovsdb_addr is None:
# TODO: clean up this ovs switch
if self.ovs_bridge:
self.ovs_bridge.del_controller()
self.ovs_bridge = None
return
self.ovsdb_addr = ovsdb_addr
if self.ovs_bridge is None:
self.logger.debug('ovsdb: adding ports %s', self.ports)
ovs_bridge = bridge.OVSBridge(dpid, ovsdb_addr)
self.ovs_bridge = ovs_bridge
ovs_bridge.init()
# TODO: for multi-controller
# not overwrite controllers, but append this controller
ovs_bridge.set_controller([self.ctrl_addr])
for port in self.ports.values():
self.logger.debug('adding port %s', port)
self.update_port(port.ofport, port.name, True)
def _update_external_port(self, port, add=True):
if add:
self.network_api.update_port(rest_nw_id.NW_ID_EXTERNAL,
self.dpid, port.ofport)
else:
self.network_api.remove_port(rest_nw_id.NW_ID_EXTERNAL,
self.dpid, port.ofport)
def _update_vif_port(self, port, add=True):
# When ovs port is updated, the corresponding network id may or
# may not exist because the order between the notification of
# ovs port deletion via OVSDB protocol and the notification
# network id/port deletion via REST from quantum plugin
# isn't deterministic.
self.logger.debug("_update_vif_port: %s %s", port, add)
iface_id = port.ext_ids.get('iface-id')
if iface_id is None:
return
try:
network_id = self.ifaces.get_key(iface_id,
QuantumIfaces.KEY_NETWORK_ID)
except KeyError:
return
if not add:
try:
self.network_api.remove_port(network_id,
self.dpid, port.ofport)
except (network.NetworkNotFound, ryu_exc.PortNotFound) as e:
self.logger.debug('remove_port %s', traceback.format_exc())
ports = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_PORTS)
other_ovs_ports = None
for p in ports:
dpid = p.get(QuantumIfaces.SUBKEY_DATAPATH_ID)
if dpid is None:
continue
if dpid != self.dpid:
continue
other_ovs_ports = self.ifaces.del_key(iface_id,
QuantumIfaces.KEY_PORTS,
p)
if other_ovs_ports:
# When live-migration, one of the two OVS ports is deleted.
return
port_data = {
'status': 'DOWN'
}
body = {'port': port_data}
# self.logger.debug("port-body = %s", body)
try:
self.q_api.update_port(port.ext_ids['iface-id'], body)
except (q_exc.ConnectionFailed, client_exc) as e:
self.logger.error("quantum update port failed: %s", e)
# TODO: When authentication failure occurred,
# it should get auth token again
return
# update {network, port, mac}
try:
self.network_api.update_network(network_id)
self.network_api.update_port(network_id, self.dpid, port.ofport)
mac = port.ext_ids.get('attached-mac')
if mac:
self.network_api.update_mac(network_id, self.dpid, port.ofport,
mac_lib.haddr_to_bin(mac))
except (network.NetworkNotFound, ryu_exc.PortNotFound) as e:
self.logger.debug('update network/port/mac %s',
traceback.format_exc())
def update_port(self, port_no, port_name, add):
self.logger.debug('update_port port_no %d %s %s', port_no, port_name,
add)
assert port_name is not None
old_port = self.ports.get(port_no)
if not add:
new_port = None
self.ports.pop(port_no, None)
else:
new_port = OVSPort(port_no, port_name)
if self.ovs_bridge:
port_cfg = self.ovs_bridge.get_quantum_ports(port_name)
if port_cfg:
if 'ofport' not in port_cfg or not port_cfg['ofport']:
port_cfg['ofport'] = port_no
elif port_cfg['ofport'] != port_no:
self.logger.warn('inconsistent port_no: %d port_cfg '
'%s', port_no, port_cfg)
return
if port_cfg['name'] != port_name:
self.logger.warn('inconsistent port_name: %s '
'port_cfg %s', port_name, port_cfg)
return
new_port.update(port_cfg)
self.ports[port_no] = new_port
iface_id = new_port.ext_ids.get('iface-id')
if iface_id:
p = {QuantumIfaces.SUBKEY_DATAPATH_ID: self.dpid,
QuantumIfaces.SUBKEY_OFPORT: port_no,
QuantumIfaces.SUBKEY_NAME: port_name}
self.ifaces.update_key(iface_id, QuantumIfaces.KEY_PORTS, p)
if old_port == new_port:
return
if not new_port:
port_type = old_port.get_port_type()
if port_type == OVSPort.PORT_ERROR:
return
elif port_type == OVSPort.PORT_UNKNOWN:
# self.logger.info("delete external port: %s", old_port)
self._update_external_port(old_port, add=False)
else:
# self.logger.info("delete port: %s", old_port)
if port_type != OVSPort.PORT_TUNNEL:
self._update_vif_port(old_port, add=False)
return
if new_port.ofport == -1:
return
if not old_port or old_port.ofport == -1:
port_type = new_port.get_port_type()
if port_type == OVSPort.PORT_ERROR:
return
elif port_type == OVSPort.PORT_UNKNOWN:
# self.logger.info("create external port: %s", new_port)
self._update_external_port(new_port)
else:
# self.logger.info("create port: %s", new_port)
if port_type != OVSPort.PORT_TUNNEL:
self._update_vif_port(new_port)
return
if new_port.get_port_type() in (OVSPort.PORT_GUEST,
OVSPort.PORT_GATEWAY,
OVSPort.PORT_VETH_GATEWAY):
# self.logger.info("update port: %s", new_port)
self._update_vif_port(new_port)
class QuantumAdapter(app_manager.RyuApp):
_CONTEXTS = {
'conf_switch': conf_switch.ConfSwitchSet,
'network': network.Network,
'quantum_ifaces': quantum_ifaces.QuantumIfaces,
}
def __init__(self, *_args, **kwargs):
super(QuantumAdapter, self).__init__()
self.cs = kwargs['conf_switch']
self.nw = kwargs['network']
self.ifaces = kwargs['quantum_ifaces']
self.dps = {}
for network_id in rest_nw_id.RESERVED_NETWORK_IDS:
if network_id == rest_nw_id.NW_ID_UNKNOWN:
continue
self.nw.update_network(network_id)
def _get_ovs_switch(self, dpid, create=True):
ovs_switch = self.dps.get(dpid)
if not ovs_switch:
if create:
ovs_switch = OVSSwitch(dpid, self.nw, self.ifaces, self.logger)
self.dps[dpid] = ovs_switch
else:
self.logger.debug('ovs switch %s is already known', dpid)
return ovs_switch
def _port_handler(self, dpid, port_no, port_name, add):
ovs_switch = self._get_ovs_switch(dpid)
if ovs_switch:
ovs_switch.update_port(port_no, port_name, add)
else:
self.logger.warn('unknown ovs switch %s %s %s %s\n',
dpid, port_no, port_name, add)
@handler.set_ev_cls(dpset.EventDP)
def dp_handler(self, ev):
dpid = ev.dp.id
ovs_switch = self._get_ovs_switch(dpid)
if not ovs_switch:
return
if ev.enter:
for port in ev.ports:
ovs_switch.update_port(port.port_no, port.name, True)
else:
# When dp leaving, we don't delete ports because OF connection
# can be disconnected for some reason.
# TODO: configuration needed to tell that this dp is really
# removed.
self.dps.pop(dpid, None)
@handler.set_ev_cls(dpset.EventPortAdd)
def port_add_handler(self, ev):
port = ev.port
name = port.name.rstrip('\0')
self._port_handler(ev.dp.id, port.port_no, name, True)
@handler.set_ev_cls(dpset.EventPortDelete)
def port_del_handler(self, ev):
port = ev.port
name = port.name.rstrip('\0')
self._port_handler(ev.dp.id, port.port_no, name, False)
def _conf_switch_set_ovsdb_addr(self, dpid, value):
ovs_switch = self._get_ovs_switch(dpid)
ovs_switch.set_ovsdb_addr(dpid, value)
def _conf_switch_del_ovsdb_addr(self, dpid):
ovs_switch = self._get_ovs_switch(dpid, False)
if ovs_switch:
ovs_switch.set_ovsdb_addr(dpid, None)
@handler.set_ev_cls(conf_switch.EventConfSwitchSet)
def conf_switch_set_handler(self, ev):
self.logger.debug("conf_switch set: %s", ev)
if ev.key == cs_key.OVSDB_ADDR:
self._conf_switch_set_ovsdb_addr(ev.dpid, ev.value)
else:
self.logger.debug("unknown event: %s", ev)
@handler.set_ev_cls(conf_switch.EventConfSwitchDel)
def conf_switch_del_handler(self, ev):
self.logger.debug("conf_switch del: %s", ev)
if ev.key == cs_key.OVSDB_ADDR:
self._conf_switch_del_ovsdb_addr(ev.dpid)
else:
self.logger.debug("unknown event: %s", ev)
@handler.set_ev_cls(quantum_ifaces.EventQuantumIfaceSet)
def quantum_iface_set_handler(self, ev):
if ev.key != quantum_ifaces.QuantumIfaces.KEY_NETWORK_ID:
# self.logger.debug("unknown key %s", ev.key)
return
iface_id = ev.iface_id
try:
ports = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_PORTS)
except KeyError:
return
for p in ports:
try:
dpid = p[QuantumIfaces.SUBKEY_DATAPATH_ID]
ofport = p[QuantumIfaces.SUBKEY_OFPORT]
port_name = p[QuantumIfaces.SUBKEY_NAME]
except KeyError:
continue
ovs_switch = self._get_ovs_switch(dpid, False)
if ovs_switch:
ovs_switch.update_port(ofport, port_name, True)