# Copyright (C) 2014,2015 VA Linux Systems Japan K.K. # Copyright (C) 2014,2015 YAMAMOTO Takashi # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import functools import random import eventlet import netaddr from neutron_lib import exceptions import os_ken.app.ofctl.api as ofctl_api import os_ken.exception as os_ken_exc from os_ken.lib import ofctl_string from os_ken.ofproto import ofproto_parser from oslo_config import cfg from oslo_log import log as logging from oslo_utils import excutils from oslo_utils import timeutils import six from neutron._i18n import _ from neutron.agent.common import ovs_lib LOG = logging.getLogger(__name__) BUNDLE_ID_WIDTH = 1 << 32 COOKIE_DEFAULT = object() class ActiveBundleRunning(exceptions.NeutronException): message = _("Another active bundle 0x%(bundle_id)x is running") class OpenFlowSwitchMixin(object): """Mixin to provide common convenient routines for an openflow switch. NOTE(yamamoto): super() points to ovs_lib.OVSBridge. See ovs_bridge.py how this class is actually used. """ @staticmethod def _cidr_to_os_ken(ip): n = netaddr.IPNetwork(ip) if n.hostmask: return (str(n.ip), str(n.netmask)) return str(n.ip) def __init__(self, *args, **kwargs): self._app = kwargs.pop('os_ken_app') self.active_bundles = set() super(OpenFlowSwitchMixin, self).__init__(*args, **kwargs) def _get_dp_by_dpid(self, dpid_int): """Get os-ken datapath object for the switch.""" timeout_sec = cfg.CONF.OVS.of_connect_timeout start_time = timeutils.now() while True: dp = ofctl_api.get_datapath(self._app, dpid_int) if dp is not None: break # The switch has not established a connection to us; retry again # until timeout. if timeutils.now() > start_time + timeout_sec: m = _("Switch connection timeout") LOG.error(m) # NOTE(yamamoto): use RuntimeError for compat with ovs_lib raise RuntimeError(m) return dp def _send_msg(self, msg, reply_cls=None, reply_multi=False, active_bundle=None): timeout_sec = cfg.CONF.OVS.of_request_timeout timeout = eventlet.Timeout(seconds=timeout_sec) if active_bundle is not None: (dp, ofp, ofpp) = self._get_dp() msg = ofpp.ONFBundleAddMsg(dp, active_bundle['id'], active_bundle['bundle_flags'], msg, []) try: result = ofctl_api.send_msg(self._app, msg, reply_cls, reply_multi) except os_ken_exc.OSKenException as e: m = _("ofctl request %(request)s error %(error)s") % { "request": msg, "error": e, } LOG.error(m) # NOTE(yamamoto): use RuntimeError for compat with ovs_lib raise RuntimeError(m) except eventlet.Timeout as e: with excutils.save_and_reraise_exception() as ctx: if e is timeout: ctx.reraise = False m = _("ofctl request %(request)s timed out") % { "request": msg, } LOG.error(m) # NOTE(yamamoto): use RuntimeError for compat with ovs_lib raise RuntimeError(m) finally: timeout.cancel() LOG.debug("ofctl request %(request)s result %(result)s", {"request": msg, "result": result}) return result @staticmethod def _match(_ofp, ofpp, match, **match_kwargs): if match is not None: return match return ofpp.OFPMatch(**match_kwargs) def uninstall_flows(self, table_id=None, strict=False, priority=0, cookie=COOKIE_DEFAULT, cookie_mask=0, match=None, active_bundle=None, **match_kwargs): (dp, ofp, ofpp) = self._get_dp() if table_id is None: table_id = ofp.OFPTT_ALL if cookie == ovs_lib.COOKIE_ANY: cookie = 0 if cookie_mask != 0: raise Exception(_("cookie=COOKIE_ANY but cookie_mask set to " "%s") % cookie_mask) elif cookie == COOKIE_DEFAULT: cookie = self._default_cookie cookie_mask = ovs_lib.UINT64_BITMASK match = self._match(ofp, ofpp, match, **match_kwargs) if strict: cmd = ofp.OFPFC_DELETE_STRICT else: cmd = ofp.OFPFC_DELETE msg = ofpp.OFPFlowMod(dp, command=cmd, cookie=cookie, cookie_mask=cookie_mask, table_id=table_id, match=match, priority=priority, out_group=ofp.OFPG_ANY, out_port=ofp.OFPP_ANY) self._send_msg(msg, active_bundle=active_bundle) def dump_flows(self, table_id=None): (dp, ofp, ofpp) = self._get_dp() if table_id is None: table_id = ofp.OFPTT_ALL msg = ofpp.OFPFlowStatsRequest(dp, table_id=table_id) replies = self._send_msg(msg, reply_cls=ofpp.OFPFlowStatsReply, reply_multi=True) flows = [] for rep in replies: flows += rep.body return flows def _dump_and_clean(self, table_id=None): cookies = set([f.cookie for f in self.dump_flows(table_id)]) - \ self.reserved_cookies for c in cookies: LOG.warning("Deleting flow with cookie 0x%(cookie)x", {'cookie': c}) self.uninstall_flows(cookie=c, cookie_mask=ovs_lib.UINT64_BITMASK) def cleanup_flows(self): LOG.info("Reserved cookies for %s: %s", self.br_name, self.reserved_cookies) for table_id in self.of_tables: self._dump_and_clean(table_id) def install_goto_next(self, table_id, active_bundle=None): self.install_goto(table_id=table_id, dest_table_id=table_id + 1, active_bundle=active_bundle) def install_output(self, port, table_id=0, priority=0, match=None, **match_kwargs): (_dp, ofp, ofpp) = self._get_dp() actions = [ofpp.OFPActionOutput(port, 0)] instructions = [ofpp.OFPInstructionActions( ofp.OFPIT_APPLY_ACTIONS, actions)] self.install_instructions(table_id=table_id, priority=priority, instructions=instructions, match=match, **match_kwargs) def install_normal(self, table_id=0, priority=0, match=None, **match_kwargs): (_dp, ofp, _ofpp) = self._get_dp() self.install_output(port=ofp.OFPP_NORMAL, table_id=table_id, priority=priority, match=match, **match_kwargs) def install_goto(self, dest_table_id, table_id=0, priority=0, match=None, **match_kwargs): (_dp, _ofp, ofpp) = self._get_dp() instructions = [ofpp.OFPInstructionGotoTable(table_id=dest_table_id)] self.install_instructions(table_id=table_id, priority=priority, instructions=instructions, match=match, **match_kwargs) def install_drop(self, table_id=0, priority=0, match=None, **match_kwargs): self.install_instructions(table_id=table_id, priority=priority, instructions=[], match=match, **match_kwargs) def install_instructions(self, instructions, table_id=0, priority=0, match=None, active_bundle=None, **match_kwargs): (dp, ofp, ofpp) = self._get_dp() match = self._match(ofp, ofpp, match, **match_kwargs) if isinstance(instructions, six.string_types): # NOTE: instructions must be str for the ofctl of_interface. # After the ofctl driver is removed, a deprecation warning # could be added here. jsonlist = ofctl_string.ofp_instruction_from_str( ofp, instructions) instructions = ofproto_parser.ofp_instruction_from_jsondict( dp, jsonlist) msg = ofpp.OFPFlowMod(dp, table_id=table_id, cookie=self.default_cookie, match=match, priority=priority, instructions=instructions) self._send_msg(msg, active_bundle=active_bundle) def install_apply_actions(self, actions, table_id=0, priority=0, match=None, **match_kwargs): (dp, ofp, ofpp) = self._get_dp() instructions = [ ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, actions), ] self.install_instructions(table_id=table_id, priority=priority, match=match, instructions=instructions, **match_kwargs) def bundled(self, atomic=False, ordered=False): return BundledOpenFlowBridge(self, atomic, ordered) class BundledOpenFlowBridge(object): def __init__(self, br, atomic, ordered): self.br = br self.active_bundle = None self.bundle_flags = 0 if not atomic and not ordered: return (dp, ofp, ofpp) = self.br._get_dp() if atomic: self.bundle_flags |= ofp.ONF_BF_ATOMIC if ordered: self.bundle_flags |= ofp.ONF_BF_ORDERED def __getattr__(self, name): if name.startswith('install') or name.startswith('uninstall'): under = getattr(self.br, name) if self.active_bundle is None: return under return functools.partial(under, active_bundle=dict( id=self.active_bundle, bundle_flags=self.bundle_flags)) raise AttributeError(_("Only install_* or uninstall_* methods " "can be used")) def __enter__(self): if self.active_bundle is not None: raise ActiveBundleRunning(bundle_id=self.active_bundle) while True: self.active_bundle = random.randrange(BUNDLE_ID_WIDTH) if self.active_bundle not in self.br.active_bundles: self.br.active_bundles.add(self.active_bundle) break try: (dp, ofp, ofpp) = self.br._get_dp() msg = ofpp.ONFBundleCtrlMsg(dp, self.active_bundle, ofp.ONF_BCT_OPEN_REQUEST, self.bundle_flags, []) reply = self.br._send_msg(msg, reply_cls=ofpp.ONFBundleCtrlMsg) if reply.type != ofp.ONF_BCT_OPEN_REPLY: raise RuntimeError( _("Unexpected reply type %d != ONF_BCT_OPEN_REPLY") % reply.type) return self except Exception: self.br.active_bundles.remove(self.active_bundle) self.active_bundle = None raise def __exit__(self, type, value, traceback): (dp, ofp, ofpp) = self.br._get_dp() if type is None: ctrl_type = ofp.ONF_BCT_COMMIT_REQUEST expected_reply = ofp.ONF_BCT_COMMIT_REPLY else: ctrl_type = ofp.ONF_BCT_DISCARD_REQUEST expected_reply = ofp.ONF_BCT_DISCARD_REPLY LOG.warning( "Discarding bundle with ID 0x%(id)x due to an exception", {'id': self.active_bundle}) try: msg = ofpp.ONFBundleCtrlMsg(dp, self.active_bundle, ctrl_type, self.bundle_flags, []) reply = self.br._send_msg(msg, reply_cls=ofpp.ONFBundleCtrlMsg) if reply.type != expected_reply: # The bundle ID may be in a bad state. Let's leave it # in active_bundles so that we will never use it again. raise RuntimeError(_("Unexpected reply type %d") % reply.type) self.br.active_bundles.remove(self.active_bundle) finally: # It is possible the bundle is kept open, but this must be # cleared or all subsequent __enter__ will fail. self.active_bundle = None