[log] ovs fw logging implementation

This patch implements ovs firewall logging driver for security group
base discussed on the spec [1] and [2]

[1] https://specs.openstack.org/openstack/neutron-specs/specs/pike/logging-API-for-security-group-rules.html
[2] https://docs.google.com/presentation/d/1fteBesETsmA7CWV6wf1i2QKa7k8EHPpRjytj8Rzeb-A/edit#slide=id.p

Change-Id: Ib8668dd25ee7c5000a6dafcc7db3dbc33ad190be
Co-Authored-By: IWAMOTO Toshihiro <iwamoto@valinux.co.jp>
Co-Authored-By: Yushiro FURUKAWA <y.furukawa_2@jp.fujitsu.com>
Partially-implements: blueprint security-group-logging
Related-Bug: #1468366
This commit is contained in:
Nguyen Phuong An 2016-11-09 17:02:48 +07:00
parent bab1ae8812
commit 7bd8b37e38
17 changed files with 977 additions and 12 deletions

View File

@ -64,6 +64,9 @@ QOS_DEFAULT_QUEUE = 0
_SENTINEL = object()
CTRL_RATE_LIMIT_MIN = 100
CTRL_BURST_LIMIT_MIN = 25
def _ovsdb_result_pending(result):
"""Return True if ovs-vsctl indicates the result is still pending."""
@ -656,14 +659,7 @@ class OVSBridge(BaseOVS):
:param connection_mode: "out-of-band" or "in-band"
"""
attr = [('connection_mode', connection_mode)]
controllers = self.db_get_val('Bridge', self.br_name, 'controller')
controllers = [controllers] if isinstance(
controllers, uuid.UUID) else controllers
with self.ovsdb.transaction(check_error=True) as txn:
for controller_uuid in controllers:
txn.add(self.ovsdb.db_set('Controller',
controller_uuid, *attr))
self.set_controller_field('connection_mode', connection_mode)
def _set_egress_bw_limit_for_port(self, port_name, max_kbps,
max_burst_kbps):
@ -821,6 +817,38 @@ class OVSBridge(BaseOVS):
if queue:
txn.add(self.ovsdb.db_destroy('Queue', queue['_uuid']))
def set_controller_field(self, field, value):
attr = [(field, value)]
controllers = self.db_get_val('Bridge', self.br_name, 'controller')
controllers = [controllers] if isinstance(
controllers, uuid.UUID) else controllers
with self.ovsdb.transaction(check_error=True) as txn:
for controller_uuid in controllers:
txn.add(self.ovsdb.db_set(
'Controller', controller_uuid, *attr))
def set_controller_rate_limit(self, controller_rate_limit):
"""Set bridge controller_rate_limit
:param controller_rate_limit: at least 100
"""
if controller_rate_limit < CTRL_RATE_LIMIT_MIN:
LOG.info("rate limit's value must be at least 100")
controller_rate_limit = CTRL_RATE_LIMIT_MIN
self.set_controller_field(
'controller_rate_limit', controller_rate_limit)
def set_controller_burst_limit(self, controller_burst_limit):
"""Set bridge controller_burst_limit
:param controller_burst_limit: at least 25
"""
if controller_burst_limit < CTRL_BURST_LIMIT_MIN:
LOG.info("burst limit's value must be at least 25")
controller_burst_limit = CTRL_BURST_LIMIT_MIN
self.set_controller_field(
'controller_burst_limit', controller_burst_limit)
def __enter__(self):
self.create()
return self

View File

@ -34,6 +34,8 @@ CT_MARK_INVALID = '0x1'
REG_PORT = 5
REG_NET = 6
# for logging remote group rule
REG_REMOTE_GROUP = 7
PROTOCOLS_WITH_PORTS = (constants.PROTO_NAME_SCTP,
constants.PROTO_NAME_TCP,

View File

@ -55,6 +55,8 @@ def create_reg_numbers(flow_params):
"""Replace reg_(port|net) values with defined register numbers"""
_replace_register(flow_params, ovsfw_consts.REG_PORT, 'reg_port')
_replace_register(flow_params, ovsfw_consts.REG_NET, 'reg_net')
_replace_register(
flow_params, ovsfw_consts.REG_REMOTE_GROUP, 'reg_remote_group')
def get_tag_from_other_config(bridge, port_name):
@ -209,6 +211,11 @@ class SGPortMap(object):
class ConjIdMap(object):
"""Handle conjunction ID allocations and deallocations."""
def __new__(cls):
if not hasattr(cls, '_instance'):
cls._instance = super(ConjIdMap, cls).__new__(cls)
return cls._instance
def __init__(self):
self.id_map = collections.defaultdict(self._conj_id_factory)
self.id_free = collections.deque()
@ -1057,10 +1064,15 @@ class OVSFirewallDriver(firewall.FirewallDriver):
flows, 2, conj_ids):
self._add_flow(**flow)
# Install actions=accept flows.
# Install accept flows and store conj_id to reg7 for future process
for conj_id in all_conj_ids:
for flow in rules.create_conj_flows(
port, conj_id, direction, ethertype):
flow['actions'] = "set_field:{:d}->reg{:d},{:s}".format(
flow['conj_id'],
ovsfw_consts.REG_REMOTE_GROUP,
flow['actions']
)
self._add_flow(**flow)
def add_flows_from_rules(self, port):

View File

@ -44,6 +44,9 @@ class OVSBridgeCookieMixin(object):
self._reserved_cookies.add(uuid_stamp)
return uuid_stamp
def unset_cookie(self, cookie):
self._reserved_cookies.discard(cookie)
def set_agent_uuid_stamp(self, val):
self._reserved_cookies.add(val)
if self._default_cookie in self._reserved_cookies:

View File

@ -26,6 +26,7 @@ from neutron.agent import securitygroups_rpc
from neutron.plugins.ml2.drivers import mech_agent
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants as a_const
from neutron.services.logapi.drivers.openvswitch import driver as log_driver
from neutron.services.qos.drivers.openvswitch import driver as ovs_qos_driver
LOG = log.getLogger(__name__)
@ -62,6 +63,7 @@ class OpenvswitchMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
vif_details, supported_vnic_types=[portbindings.VNIC_NORMAL,
portbindings.VNIC_DIRECT])
ovs_qos_driver.register()
log_driver.register()
def get_allowed_network_types(self, agent):
return (agent['configurations'].get('tunnel_types', []) +

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as const
from oslo_log import log as logging
from sqlalchemy.orm import exc as orm_exc
@ -83,6 +84,8 @@ def _get_ports_being_logged(context, sg_log):
validated_port_ids = []
ports = port_objects.Port.get_objects(context, id=port_ids)
for port in ports:
if port.status != const.PORT_STATUS_ACTIVE:
continue
if validators.validate_log_type_for_port('security_group', port):
validated_port_ids.append(port.id)
else:

View File

@ -47,3 +47,7 @@ class InvalidResourceConstraint(n_exc.InvalidInput):
class LogapiDriverException(n_exc.NeutronException):
"""A log api driver Exception"""
message = _("Driver exception: %(exception_msg)s")
class CookieNotFound(n_exc.NotFound):
message = _("Cookie %(cookie_id)s could not be found.")

View File

@ -0,0 +1,46 @@
# Copyright (c) 2017 Fujitsu Limited.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import portbindings
from oslo_log import log as logging
from neutron.services.logapi.drivers import base
LOG = logging.getLogger(__name__)
DRIVER = None
SUPPORTED_LOGGING_TYPES = ['security_group']
class OVSDriver(base.DriverBase):
@staticmethod
def create():
return OVSDriver(
name='openvswitch',
vif_types=[portbindings.VIF_TYPE_OVS,
portbindings.VIF_TYPE_VHOST_USER],
vnic_types=[portbindings.VNIC_NORMAL],
supported_logging_types=SUPPORTED_LOGGING_TYPES,
requires_rpc=True)
def register():
"""Register the driver."""
global DRIVER
if not DRIVER:
DRIVER = OVSDriver.create()
LOG.debug('Open vSwitch logging driver registered')

View File

@ -0,0 +1,38 @@
# Copyright (C) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from ryu.base import app_manager
from ryu.controller import handler
from ryu.controller import ofp_event
from ryu.ofproto import ofproto_v1_3
LOG = logging.getLogger(__name__)
class OVSLogRyuApp(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
packet_in_handlers = []
def register_packet_in_handler(self, caller):
self.packet_in_handlers.append(caller)
def unregister_packet_in_handler(self, caller):
self.packet_in_handlers.remove(caller)
@handler.set_ev_cls(ofp_event.EventOFPPacketIn, handler.MAIN_DISPATCHER)
def packet_in_handler(self, ev):
for caller in self.packet_in_handlers:
caller(ev)

View File

@ -0,0 +1,462 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron_lib import constants as lib_const
from oslo_config import cfg
from oslo_log import handlers
from oslo_log import log as logging
from ryu.base import app_manager
from ryu.lib.packet import packet
from neutron.agent import firewall
from neutron.agent.linux.openvswitch_firewall import constants as ovsfw_consts
from neutron.agent.linux.openvswitch_firewall import firewall as ovsfw
from neutron.agent.linux.openvswitch_firewall import rules
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \
as ovs_consts
from neutron.services.logapi.agent import log_extension as log_ext
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.common import exceptions as log_exc
from neutron.services.logapi.drivers.openvswitch import log_ryuapp
LOG = logging.getLogger(__name__)
OVS_FW_TO_LOG_TABLES = {
ovs_consts.RULES_EGRESS_TABLE: ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
ovs_consts.RULES_INGRESS_TABLE: ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
}
FIELDS_TO_REMOVE = ['priority', 'actions', 'dl_type',
'reg_port', 'reg_remote_group']
REMOTE_RULE_PRIORITY = 70
def setup_logging():
log_file = cfg.CONF.network_log.local_output_log_base
if log_file:
from logging import handlers as watch_handler
log_file_handler = watch_handler.WatchedFileHandler(log_file)
LOG.logger.addHandler(log_file_handler)
elif cfg.CONF.use_journal:
journal_handler = handlers.OSJournalHandler()
LOG.logger.addHandler(journal_handler)
else:
syslog_handler = handlers.OSSysLogHandler()
LOG.logger.addHandler(syslog_handler)
def find_deleted_sg_rules(old_port, new_ports):
del_rules = list()
for port in new_ports:
if old_port.id == port.id:
for rule in old_port.secgroup_rules:
if rule not in port.secgroup_rules:
del_rules.append(rule)
return del_rules
return del_rules
class Cookie(object):
def __init__(self, cookie_id, port, action, project):
self.id = cookie_id
self.port = port
self.action = action
self.project = project
self.log_object_refs = set()
def __eq__(self, other):
return (self.id == other.id and
self.action == other.action and
self.port == other.port)
def __hash__(self):
return hash(self.id)
def add_log_obj_ref(self, log_id):
self.log_object_refs.add(log_id)
def remove_log_obj_ref(self, log_id):
self.log_object_refs.discard(log_id)
@property
def is_empty(self):
return not self.log_object_refs
class OFPortLog(object):
def __init__(self, port, ovs_port, log_event):
self.id = port['port_id']
self.ofport = ovs_port.ofport
self.secgroup_rules = [self._update_rule(rule) for rule in
port['security_group_rules']]
# event can be ALL, DROP and ACCEPT
self.event = log_event
def _update_rule(self, rule):
protocol = rule.get('protocol')
if protocol is not None:
if not isinstance(protocol, int) and protocol.isdigit():
rule['protocol'] = int(protocol)
elif (rule.get('ethertype') == lib_const.IPv6 and
protocol == lib_const.PROTO_NAME_ICMP):
rule['protocol'] = lib_const.PROTO_NUM_IPV6_ICMP
else:
rule['protocol'] = lib_const.IP_PROTOCOL_MAP.get(
protocol, protocol)
return rule
class OVSFirewallLoggingDriver(log_ext.LoggingDriver):
SUPPORTED_LOGGING_TYPES = ['security_group']
REQUIRED_PROTOCOLS = [
ovs_consts.OPENFLOW13,
ovs_consts.OPENFLOW14,
]
def __init__(self, integration_bridge):
self.int_br = self.initialize_bridge(integration_bridge)
self._deferred = False
self.log_ports = collections.defaultdict(dict)
self.cookies_table = set()
self.cookie_ids_to_delete = set()
self.conj_id_map = ovsfw.ConjIdMap()
def initialize(self, resource_rpc, **kwargs):
self.resource_rpc = resource_rpc
setup_logging()
self.start_logapp()
@staticmethod
def initialize_bridge(bridge):
bridge.add_protocols(*OVSFirewallLoggingDriver.REQUIRED_PROTOCOLS)
# set rate limit and burst limit for controller
bridge.set_controller_rate_limit(cfg.CONF.network_log.rate_limit)
bridge.set_controller_burst_limit(cfg.CONF.network_log.burst_limit)
return bridge.deferred(full_ordered=True)
def start_logapp(self):
app_mgr = app_manager.AppManager.get_instance()
self.log_app = app_mgr.instantiate(log_ryuapp.OVSLogRyuApp)
self.log_app.start()
self.log_app.register_packet_in_handler(self.packet_in_handler)
def packet_in_handler(self, ev):
msg = ev.msg
cookie_id = msg.cookie
pkt = packet.Packet(msg.data)
try:
cookie_entry = self._get_cookie_by_id(cookie_id)
LOG.debug("action=%s project_id=%s log_resource_ids=%s vm_port=%s "
"pkt=%s", cookie_entry.action, cookie_entry.project,
list(cookie_entry.log_object_refs),
cookie_entry.port, pkt)
except log_exc.CookieNotFound:
LOG.debug("Unknown cookie=%s packet_in pkt=%s", cookie_id, pkt)
def defer_apply_on(self):
self._deferred = True
def defer_apply_off(self):
if self._deferred:
self.int_br.apply_flows()
self._cleanup_cookies()
self._deferred = False
def _get_cookie(self, port_id, action):
for cookie in self.cookies_table:
if cookie.port == port_id and cookie.action == action:
return cookie
def _get_cookies_by_port(self, port_id):
cookies_list = []
for cookie in self.cookies_table:
if cookie.port == port_id:
cookies_list.append(cookie)
return cookies_list
def _get_cookie_by_id(self, cookie_id):
for cookie in self.cookies_table:
if str(cookie.id) == str(cookie_id):
return cookie
raise log_exc.CookieNotFound(cookie_id=cookie_id)
def _cleanup_cookies(self):
cookie_ids = self.cookie_ids_to_delete
self.cookie_ids_to_delete = set()
for cookie_id in cookie_ids:
self.int_br.br.unset_cookie(cookie_id)
def generate_cookie(self, port_id, action, log_id, project_id):
cookie = self._get_cookie(port_id, action)
if not cookie:
cookie_id = self.int_br.br.request_cookie()
cookie = Cookie(cookie_id=cookie_id, port=port_id,
action=action, project=project_id)
self.cookies_table.add(cookie)
cookie.add_log_obj_ref(log_id)
return cookie.id
def _schedule_cookie_deletion(self, cookie):
# discard a cookie object
self.cookies_table.remove(cookie)
# schedule to cleanup cookie_ids later
self.cookie_ids_to_delete.add(cookie.id)
def start_logging(self, context, **kwargs):
LOG.debug("start logging: %s", str(kwargs))
for resource_type in self.SUPPORTED_LOGGING_TYPES:
# handle port updated, agent restarted
if 'port_id' in kwargs:
self._handle_logging('_create', context,
resource_type, **kwargs)
else:
self._handle_log_resources_by_type(
'_create', context, resource_type, **kwargs)
def stop_logging(self, context, **kwargs):
LOG.debug("stop logging: %s", str(kwargs))
for resource_type in self.SUPPORTED_LOGGING_TYPES:
# handle port deleted
if 'port_id' in kwargs:
self._handle_logging('_delete', context,
resource_type, **kwargs)
else:
self._handle_log_resources_by_type(
'_delete', context, resource_type, **kwargs)
def _handle_log_resources_by_type(
self, action, context, resource_type, **kwargs):
log_resources = []
for log_obj in kwargs.get('log_resources', []):
if log_obj['resource_type'] == resource_type:
log_resources.append(log_obj)
if log_resources:
self._handle_logging(
action, context, resource_type, log_resources=log_resources)
def _handle_logging(self, action, context, resource_type, **kwargs):
handler_name = "%s_%s_log" % (action, resource_type)
handler = getattr(self, handler_name)
handler(context, **kwargs)
def create_ofport_log(self, port, log_id, log_event):
port_id = port['port_id']
ovs_port = self.int_br.br.get_vif_port_by_id(port_id)
if ovs_port:
of_port_log = OFPortLog(port, ovs_port, log_event)
self.log_ports[log_id].add(of_port_log)
def _create_security_group_log(self, context, **kwargs):
port_id = kwargs.get('port_id')
log_resources = kwargs.get('log_resources')
logs_info = []
if port_id:
# try to clean port flows log for port updated/create event
self._cleanup_port_flows_log(port_id)
logs_info = self.resource_rpc.get_sg_log_info_for_port(
context, port_id=port_id)
elif log_resources:
logs_info = self.resource_rpc.get_sg_log_info_for_log_resources(
context, log_resources=log_resources)
for log_info in logs_info:
log_id = log_info['id']
old_ofport_logs = self.log_ports.get(log_id, [])
ports = log_info.get('ports_log')
self.log_ports[log_id] = set()
for port in ports:
self.create_ofport_log(port, log_id, log_info.get('event'))
# try to clean flows log if sg_rules are deleted
for port in old_ofport_logs:
del_rules = find_deleted_sg_rules(
port, self.log_ports[log_id])
if del_rules:
self._delete_sg_rules_flow_log(port, del_rules)
for port_log in self.log_ports[log_id]:
self.add_flows_from_rules(port_log, log_info)
def _cleanup_port_flows_log(self, port_id):
cookies_list = self._get_cookies_by_port(port_id)
for cookie in cookies_list:
if cookie.action == log_const.ACCEPT_EVENT:
self._delete_flows(
table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
cookie=cookie.id)
self._delete_flows(
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
cookie=cookie.id)
if cookie.action == log_const.DROP_EVENT:
self._delete_flows(
table=ovs_consts.DROPPED_TRAFFIC_TABLE,
cookie=cookie.id)
self._schedule_cookie_deletion(cookie)
def _delete_security_group_log(self, context, **kwargs):
# port deleted event
port_id = kwargs.get('port_id')
if port_id:
self._cleanup_port_flows_log(port_id)
# log resources deleted events
for log_resource in kwargs.get('log_resources', []):
log_id = log_resource.get('id')
of_port_logs = self.log_ports.get(log_id, [])
for of_port_log in of_port_logs:
self.delete_port_flows_log(of_port_log, log_id)
def _log_accept_flow(self, **flow):
# log first packet
flow['ct_state'] = ovsfw_consts.OF_STATE_NEW_NOT_ESTABLISHED
flow['table'] = OVS_FW_TO_LOG_TABLES[flow['table']]
flow['actions'] = 'controller'
self._add_flow(**flow)
def _add_flow(self, **kwargs):
dl_type = kwargs.get('dl_type')
ovsfw.create_reg_numbers(kwargs)
if isinstance(dl_type, int):
kwargs['dl_type'] = "0x{:04x}".format(dl_type)
LOG.debug("Add flow firewall log %s", str(kwargs))
if self._deferred:
self.int_br.add_flow(**kwargs)
else:
self.int_br.br.add_flow(**kwargs)
def _delete_flows(self, **kwargs):
ovsfw.create_reg_numbers(kwargs)
if self._deferred:
self.int_br.delete_flows(**kwargs)
else:
self.int_br.br.delete_flows(**kwargs)
def _log_drop_packet(self, port, log_id, project_id):
cookie = self.generate_cookie(port.id, log_const.DROP_EVENT,
log_id, project_id)
self._add_flow(
cookie=cookie,
table=ovs_consts.DROPPED_TRAFFIC_TABLE,
priority=53,
reg_port=port.ofport,
actions='controller'
)
def create_rules_generator_for_port(self, port):
for rule in port.secgroup_rules:
yield rule
def _create_conj_flows_log(self, remote_rule, port):
ethertype = remote_rule['ethertype']
direction = remote_rule['direction']
remote_sg_id = remote_rule['remote_group_id']
secgroup_id = remote_rule['security_group_id']
# we only want to log first accept packet, that means a packet with
# ct_state=+new-est, reg_remote_group=conj_id + 1 will be logged
flow_template = {
'priority': REMOTE_RULE_PRIORITY,
'dl_type': ovsfw_consts.ethertype_to_dl_type_map[ethertype],
'reg_port': port.ofport,
'reg_remote_group': self.conj_id_map.get_conj_id(
secgroup_id, remote_sg_id, direction, ethertype) + 1,
}
if direction == firewall.INGRESS_DIRECTION:
flow_template['table'] = ovs_consts.RULES_INGRESS_TABLE
elif direction == firewall.EGRESS_DIRECTION:
flow_template['table'] = ovs_consts.RULES_EGRESS_TABLE
return [flow_template]
def _log_accept_packet(self, port, log_id, project_id):
cookie = self.generate_cookie(port.id, log_const.ACCEPT_EVENT,
log_id, project_id)
for rule in self.create_rules_generator_for_port(port):
if 'remote_group_id' in rule:
flows = self._create_conj_flows_log(rule, port)
else:
flows = rules.create_flows_from_rule_and_port(rule, port)
for flow in flows:
flow['cookie'] = cookie
self._log_accept_flow(**flow)
def add_flows_from_rules(self, port, log_info):
# log event can be ACCEPT or DROP or ALL(both ACCEPT and DROP)
event = log_info['event']
project_id = log_info['project_id']
log_id = log_info['id']
if event == log_const.ACCEPT_EVENT:
self._log_accept_packet(port, log_id, project_id)
elif event == log_const.DROP_EVENT:
self._log_drop_packet(port, log_id, project_id)
else:
self._log_drop_packet(port, log_id, project_id)
self._log_accept_packet(port, log_id, project_id)
def _delete_accept_flows_log(self, port, log_id):
cookie = self._get_cookie(port.id, log_const.ACCEPT_EVENT)
if cookie:
cookie.remove_log_obj_ref(log_id)
if cookie.is_empty:
self._delete_flows(
table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
cookie=cookie.id)
self._delete_flows(
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
cookie=cookie.id)
self._schedule_cookie_deletion(cookie)
def _delete_drop_flows_log(self, port, log_id):
cookie = self._get_cookie(port.id, log_const.DROP_EVENT)
if cookie:
cookie.remove_log_obj_ref(log_id)
if cookie.is_empty:
self._delete_flows(table=ovs_consts.DROPPED_TRAFFIC_TABLE,
cookie=cookie.id)
self._schedule_cookie_deletion(cookie)
def delete_port_flows_log(self, port, log_id):
"""Delete all flows log for given port and log_id"""
event = port.event
if event == log_const.ACCEPT_EVENT:
self._delete_accept_flows_log(port, log_id)
elif event == log_const.DROP_EVENT:
self._delete_drop_flows_log(port, log_id)
else:
self._delete_accept_flows_log(port, log_id)
self._delete_drop_flows_log(port, log_id)
def _delete_sg_rules_flow_log(self, port, del_rules):
cookie = self._get_cookie(port.id, log_const.ACCEPT_EVENT)
if not cookie:
return
for rule in del_rules:
if 'remote_group_id' in rule:
flows = self._create_conj_flows_log(rule, port)
else:
flows = rules.create_flows_from_rule_and_port(rule, port)
for flow in flows:
for kw in FIELDS_TO_REMOVE:
flow.pop(kw, None)
flow['table'] = OVS_FW_TO_LOG_TABLES[flow['table']]
flow['cookie'] = cookie.id
self._delete_flows(**flow)

View File

@ -958,6 +958,38 @@ class OVS_Lib_Test(base.BaseTestCase):
self.assertRaises(tenacity.RetryError,
self.br._get_port_val, '1', 'external_ids')
def test_set_controller_rate_limit(self):
with mock.patch.object(
self.br, "set_controller_field"
) as set_ctrl_field_mock:
self.br.set_controller_rate_limit(200)
set_ctrl_field_mock.assert_called_once_with(
'controller_rate_limit', 200)
def test_set_controller_rate_limit_with_value_less_than_min(self):
with mock.patch.object(
self.br, "set_controller_field"
) as set_ctrl_field_mock:
self.br.set_controller_rate_limit(50)
set_ctrl_field_mock.assert_called_once_with(
'controller_rate_limit', ovs_lib.CTRL_RATE_LIMIT_MIN)
def test_set_controller_burst_limit(self):
with mock.patch.object(
self.br, "set_controller_field"
) as set_ctrl_field_mock:
self.br.set_controller_burst_limit(100)
set_ctrl_field_mock.assert_called_once_with(
'controller_burst_limit', 100)
def test_set_controller_burst_limit_with_value_less_than_min(self):
with mock.patch.object(
self.br, "set_controller_field"
) as set_ctrl_field_mock:
self.br.set_controller_burst_limit(10)
set_ctrl_field_mock.assert_called_once_with(
'controller_burst_limit', ovs_lib.CTRL_BURST_LIMIT_MIN)
class TestDeferredOVSBridge(base.BaseTestCase):

View File

@ -41,11 +41,13 @@ class TestCreateRegNumbers(base.BaseTestCase):
ovsfw.create_reg_numbers(flow)
self.assertEqual({'foo': 'bar'}, flow)
def test_both_registers_defined(self):
flow = {'foo': 'bar', 'reg_port': 1, 'reg_net': 2}
def test_all_registers_defined(self):
flow = {'foo': 'bar', 'reg_port': 1, 'reg_net': 2,
'reg_remote_group': 3}
expected_flow = {'foo': 'bar',
'reg{:d}'.format(ovsfw_consts.REG_PORT): 1,
'reg{:d}'.format(ovsfw_consts.REG_NET): 2}
'reg{:d}'.format(ovsfw_consts.REG_NET): 2,
'reg{:d}'.format(ovsfw_consts.REG_REMOTE_GROUP): 3}
ovsfw.create_reg_numbers(flow)
self.assertEqual(expected_flow, flow)

View File

@ -42,6 +42,12 @@ class TestBRCookieOpenflow(base.BaseTestCase):
self.assertIn(default_cookie, self.br.reserved_cookies)
self.assertIn(requested_cookie, self.br.reserved_cookies)
def test_unset_cookie(self):
requested_cookie = self.br.request_cookie()
self.assertIn(requested_cookie, self.br.reserved_cookies)
self.br.unset_cookie(requested_cookie)
self.assertNotIn(requested_cookie, self.br.reserved_cookies)
def test_set_agent_uuid_stamp(self):
self.br = ovs_bridge.OVSAgentBridge('br-int')
def_cookie = self.br.default_cookie

View File

@ -0,0 +1,323 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import constants
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.agent.common import ovs_lib
from neutron.agent import firewall
from neutron.agent.linux.openvswitch_firewall import constants as ovsfw_consts
from neutron.common import constants as n_const
from neutron.objects.logapi import logging_resource as log_object
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \
as ovs_consts
from neutron.services.logapi.common import exceptions as log_exc
from neutron.services.logapi.drivers.openvswitch \
import ovs_firewall_log as ovsfw_log
from neutron.services.logapi.rpc import agent as agent_rpc
from neutron.tests import base
from neutron.tests import tools
COOKIE_ID = uuidutils.generate_uuid()
PORT_ID = uuidutils.generate_uuid()
PROJECT_ID = uuidutils.generate_uuid()
ACTION = tools.get_random_security_event()
LOG_ID = uuidutils.generate_uuid()
SG_ID = uuidutils.generate_uuid()
REMOTE_SG_ID = uuidutils.generate_uuid()
FakeSGLogInfo = [
{
'id': LOG_ID,
'ports_log': [{'port_id': PORT_ID,
'security_group_rules': [
{'ethertype': constants.IPv4,
'protocol': constants.PROTO_NAME_TCP,
'direction': firewall.INGRESS_DIRECTION,
'port_range_min': 123,
'port_range_max': 123,
'security_group_id': SG_ID},
{'ethertype': constants.IPv4,
'protocol': constants.PROTO_NAME_UDP,
'direction': firewall.EGRESS_DIRECTION,
'security_group_id': SG_ID},
{'ethertype': constants.IPv6,
'protocol': constants.PROTO_NAME_TCP,
'remote_group_id': REMOTE_SG_ID,
'direction': firewall.EGRESS_DIRECTION,
'security_group_id': SG_ID}
]}],
'event': 'ALL',
'project_id': PROJECT_ID,
}
]
def set_log_driver_config(ctrl_rate_limit, ctrl_burst_limit):
cfg.CONF.set_override('rate_limit', ctrl_rate_limit, group='network_log')
cfg.CONF.set_override('burst_limit', ctrl_burst_limit, group='network_log')
class TestCookie(base.BaseTestCase):
def setUp(self):
super(TestCookie, self).setUp()
self.cookie = ovsfw_log.Cookie(COOKIE_ID, PORT_ID, ACTION, PROJECT_ID)
self.cookie.log_object_refs = set([LOG_ID])
def test_add_log_object_refs(self):
new_log_id = uuidutils.generate_uuid()
expected = set([LOG_ID, new_log_id])
self.cookie.add_log_obj_ref(new_log_id)
self.assertEqual(expected, self.cookie.log_object_refs)
def test_removed_log_object_ref(self):
expected = set()
self.cookie.remove_log_obj_ref(LOG_ID)
self.assertEqual(expected, self.cookie.log_object_refs)
def test_is_empty(self):
self.cookie.remove_log_obj_ref(LOG_ID)
result = self.cookie.is_empty
self.assertTrue(result)
class FakeOVSPort(object):
def __init__(self, name, port, mac):
self.port_name = name
self.ofport = port
self.vif_mac = mac
class TestOVSFirewallLoggingDriver(base.BaseTestCase):
def setUp(self):
super(TestOVSFirewallLoggingDriver, self).setUp()
mock_bridge = mock.patch.object(
ovs_lib, 'OVSBridge', autospec=True).start()
self.log_driver = ovsfw_log.OVSFirewallLoggingDriver(mock_bridge)
resource_rpc_mock = mock.patch.object(
agent_rpc, 'LoggingApiStub', autospec=True).start()
self.log_driver.start_logapp = mock.Mock()
self.log_driver.initialize(resource_rpc_mock)
self.log_driver.SUPPORTED_LOGGING_TYPES = ['security_group']
self.mock_bridge = self.log_driver.int_br
self.mock_bridge.reset_mock()
self.fake_ovs_port = FakeOVSPort('port', 1, '00:00:00:00:00:00')
self.mock_bridge.br.get_vif_port_by_id.return_value = \
self.fake_ovs_port
log_data = {
'context': None,
'name': 'test1',
'id': LOG_ID,
'project_id': PROJECT_ID,
'event': 'ALL',
'resource_type': 'security_group'
}
self.log_resource = log_object.Log(**log_data)
@property
def port_ofport(self):
return self.mock_bridge.br.get_vif_port_by_id.return_value.ofport
@property
def port_mac(self):
return self.mock_bridge.br.get_vif_port_by_id.return_value.vif_mac
def test_initialize_bridge(self):
br = self.log_driver.initialize_bridge(self.mock_bridge)
self.assertEqual(self.mock_bridge.deferred.return_value, br)
def test_set_controller_rate_limit(self):
set_log_driver_config(100, 25)
self.log_driver.initialize_bridge(self.mock_bridge)
expected_calls = [mock.call.set_controller_rate_limit(100),
mock.call.set_controller_burst_limit(25)]
self.mock_bridge.assert_has_calls(expected_calls)
def test_generate_cookie(self):
cookie_id = self.log_driver.generate_cookie(
PORT_ID, ACTION, LOG_ID, PROJECT_ID)
cookie = self.log_driver._get_cookie_by_id(cookie_id)
self.assertIn(cookie, self.log_driver.cookies_table)
def test__get_cookie_by_id_not_found(self):
cookie_id = uuidutils.generate_uuid()
cookie = ovsfw_log.Cookie(cookie_id=uuidutils.generate_uuid(),
port=PORT_ID, action=ACTION,
project=PROJECT_ID)
self.log_driver.cookies_table = set([cookie])
self.assertRaises(log_exc.CookieNotFound,
self.log_driver._get_cookie_by_id,
cookie_id)
def test_start_log_with_update_or_create_log_event(self):
context = mock.Mock()
log_data = {'log_resources': [self.log_resource]}
self.log_driver.resource_rpc.get_sg_log_info_for_log_resources.\
return_value = FakeSGLogInfo
self.log_driver.start_logging(context, **log_data)
accept_cookie = self.log_driver._get_cookie(PORT_ID, 'ACCEPT')
drop_cookie = self.log_driver._get_cookie(PORT_ID, 'DROP')
conj_id = self.log_driver.conj_id_map.get_conj_id(
SG_ID, REMOTE_SG_ID, firewall.EGRESS_DIRECTION, constants.IPv6)
add_rules = [
# log ingress tcp port=123
mock.call(
actions='controller',
cookie=accept_cookie.id,
ct_state=ovsfw_consts.OF_STATE_NEW_NOT_ESTABLISHED,
reg5=self.port_ofport,
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IP),
nw_proto=constants.PROTO_NUM_TCP,
priority=77,
table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
tcp_dst='0x007b'),
# log egress tcp6
mock.call(
actions='controller',
cookie=accept_cookie.id,
ct_state=ovsfw_consts.OF_STATE_NEW_NOT_ESTABLISHED,
reg5=self.port_ofport,
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IPV6),
priority=70,
reg7=conj_id + 1,
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE),
# log egress udp
mock.call(
actions='controller',
cookie=accept_cookie.id,
ct_state=ovsfw_consts.OF_STATE_NEW_NOT_ESTABLISHED,
reg5=self.port_ofport,
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IP),
nw_proto=constants.PROTO_NUM_UDP,
priority=77,
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
),
# log drop
mock.call(
actions='controller',
cookie=drop_cookie.id,
priority=53,
reg5=self.port_ofport,
table=ovs_consts.DROPPED_TRAFFIC_TABLE,
)
]
self.mock_bridge.br.add_flow.assert_has_calls(
add_rules, any_order=True)
def test_stop_log_with_delete_log_event(self):
context = mock.Mock()
log_data = {'log_resources': [self.log_resource]}
self.log_driver.resource_rpc.get_sg_log_info_for_log_resources.\
return_value = FakeSGLogInfo
self.log_driver.start_logging(context, **log_data)
accept_cookie = self.log_driver._get_cookie(PORT_ID, 'ACCEPT')
drop_cookie = self.log_driver._get_cookie(PORT_ID, 'DROP')
self.mock_bridge.reset_mock()
self.log_driver.stop_logging(context, **log_data)
delete_rules = [
# delete drop flow
mock.call(
table=ovs_consts.DROPPED_TRAFFIC_TABLE,
cookie=drop_cookie.id
),
# delete accept flows
mock.call(
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
cookie=accept_cookie.id
),
mock.call(
table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
cookie=accept_cookie.id
)
]
self.mock_bridge.br.delete_flows.assert_has_calls(
delete_rules, any_order=True)
def test_start_log_with_add_port_event(self):
context = mock.Mock()
log_data = {'port_id': PORT_ID}
self.log_driver.resource_rpc.get_sg_log_info_for_port.return_value = \
[
{
'id': uuidutils.generate_uuid(),
'ports_log': [{'port_id': PORT_ID,
'security_group_rules': [
{'ethertype': constants.IPv4,
'protocol': constants.PROTO_NAME_TCP,
'direction':
firewall.INGRESS_DIRECTION,
'port_range_min': 123,
'port_range_max': 123,
'security_group_id': 456}]}],
'event': 'ACCEPT',
'project_id': PROJECT_ID,
}
]
self.log_driver.start_logging(context, **log_data)
accept_cookie = self.log_driver._get_cookie(PORT_ID, 'ACCEPT')
add_rules = [
# log ingress tcp port=123
mock.call(
actions='controller',
cookie=accept_cookie.id,
ct_state=ovsfw_consts.OF_STATE_NEW_NOT_ESTABLISHED,
reg5=self.port_ofport,
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IP),
nw_proto=constants.PROTO_NUM_TCP,
priority=77,
table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
tcp_dst='0x007b')
]
self.mock_bridge.br.add_flow.assert_has_calls(
add_rules, any_order=True)
def test_stop_log_with_delete_port_event(self):
context = mock.Mock()
log_data = {'port_id': PORT_ID}
# add port
self.log_driver.resource_rpc.get_sg_log_info_for_port.return_value = \
FakeSGLogInfo
self.log_driver.start_logging(context, **log_data)
accept_cookie = self.log_driver._get_cookie(PORT_ID, 'ACCEPT')
drop_cookie = self.log_driver._get_cookie(PORT_ID, 'DROP')
self.mock_bridge.reset_mock()
# delete port
self.log_driver.stop_logging(
context, port_id=PORT_ID)
delete_rules = [
# delete accept flows
mock.call(
table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
cookie=accept_cookie.id
),
mock.call(
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
cookie=accept_cookie.id
),
# delete drop flow
mock.call(
table=ovs_consts.DROPPED_TRAFFIC_TABLE,
cookie=drop_cookie.id
),
]
self.mock_bridge.br.delete_flows.assert_has_calls(
delete_rules, any_order=True)

View File

@ -117,6 +117,8 @@ neutron.agent.l2.extensions =
log = neutron.services.logapi.agent.log_extension:LoggingExtension
neutron.agent.l3.extensions =
fip_qos = neutron.agent.l3.extensions.fip_qos:FipQosAgentExtension
neutron.services.logapi.drivers =
ovs = neutron.services.logapi.drivers.openvswitch.ovs_firewall_log:OVSFirewallLoggingDriver
neutron.qos.agent_drivers =
ovs = neutron.plugins.ml2.drivers.openvswitch.agent.extension_drivers.qos_driver:QosOVSAgentDriver
sriov = neutron.plugins.ml2.drivers.mech_sriov.agent.extension_drivers.qos_driver:QosSRIOVAgentDriver