bda2c484cb
This patch adds all RPC workers to consumers of FWaaS plugin queue. This is important for DVR-enabled deployments with hundreds of agents. Change-Id: I656ef6248379c845c4c7862fabd26eedd8b7fd46 Depends-On: I6fea7f409c91b25d2c35b038d6100fdfa85d1905 Related-Bug: #1498844
393 lines
16 KiB
Python
393 lines
16 KiB
Python
# Copyright 2013 Big Switch Networks, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from neutron.api.v2 import attributes as attr
|
|
from neutron.common import rpc as n_rpc
|
|
from neutron.common import topics
|
|
from neutron import context as neutron_context
|
|
from neutron.i18n import _LW
|
|
from neutron import manager
|
|
from neutron.plugins.common import constants as const
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
import oslo_messaging
|
|
|
|
from neutron_fwaas.db.firewall import firewall_db
|
|
from neutron_fwaas.db.firewall import firewall_router_insertion_db
|
|
from neutron_fwaas.extensions import firewall as fw_ext
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class FirewallCallbacks(object):
|
|
target = oslo_messaging.Target(version='1.0')
|
|
|
|
def __init__(self, plugin):
|
|
super(FirewallCallbacks, self).__init__()
|
|
self.plugin = plugin
|
|
|
|
def set_firewall_status(self, context, firewall_id, status, **kwargs):
|
|
"""Agent uses this to set a firewall's status."""
|
|
LOG.debug("Setting firewall %s to status: %s" % (firewall_id, status))
|
|
# Sanitize status first
|
|
if status in (const.ACTIVE, const.DOWN, const.INACTIVE):
|
|
to_update = status
|
|
else:
|
|
to_update = const.ERROR
|
|
# ignore changing status if firewall expects to be deleted
|
|
# That case means that while some pending operation has been
|
|
# performed on the backend, neutron server received delete request
|
|
# and changed firewall status to PENDING_DELETE
|
|
updated = self.plugin.update_firewall_status(
|
|
context, firewall_id, to_update, not_in=(const.PENDING_DELETE,))
|
|
if updated:
|
|
LOG.debug("firewall %s status set: %s" % (firewall_id, to_update))
|
|
return updated and to_update != const.ERROR
|
|
|
|
def firewall_deleted(self, context, firewall_id, **kwargs):
|
|
"""Agent uses this to indicate firewall is deleted."""
|
|
LOG.debug("firewall_deleted() called")
|
|
with context.session.begin(subtransactions=True):
|
|
fw_db = self.plugin._get_firewall(context, firewall_id)
|
|
# allow to delete firewalls in ERROR state
|
|
if fw_db.status in (const.PENDING_DELETE, const.ERROR):
|
|
self.plugin.delete_db_firewall_object(context, firewall_id)
|
|
return True
|
|
else:
|
|
LOG.warn(_LW('Firewall %(fw)s unexpectedly deleted by agent, '
|
|
'status was %(status)s'),
|
|
{'fw': firewall_id, 'status': fw_db.status})
|
|
fw_db.update({"status": const.ERROR})
|
|
return False
|
|
|
|
def get_firewalls_for_tenant(self, context, **kwargs):
|
|
"""Agent uses this to get all firewalls and rules for a tenant."""
|
|
LOG.debug("get_firewalls_for_tenant() called")
|
|
fw_list = []
|
|
for fw in self.plugin.get_firewalls(context):
|
|
fw_with_rules = self.plugin._make_firewall_dict_with_rules(
|
|
context, fw['id'])
|
|
if fw['status'] == const.PENDING_DELETE:
|
|
fw_with_rules['add-router-ids'] = []
|
|
fw_with_rules['del-router-ids'] = (
|
|
self.plugin.get_firewall_routers(context, fw['id']))
|
|
else:
|
|
fw_with_rules['add-router-ids'] = (
|
|
self.plugin.get_firewall_routers(context, fw['id']))
|
|
fw_with_rules['del-router-ids'] = []
|
|
fw_list.append(fw_with_rules)
|
|
return fw_list
|
|
|
|
def get_firewalls_for_tenant_without_rules(self, context, **kwargs):
|
|
"""Agent uses this to get all firewalls for a tenant."""
|
|
LOG.debug("get_firewalls_for_tenant_without_rules() called")
|
|
fw_list = [fw for fw in self.plugin.get_firewalls(context)]
|
|
return fw_list
|
|
|
|
def get_tenants_with_firewalls(self, context, **kwargs):
|
|
"""Agent uses this to get all tenants that have firewalls."""
|
|
LOG.debug("get_tenants_with_firewalls() called")
|
|
ctx = neutron_context.get_admin_context()
|
|
fw_list = self.plugin.get_firewalls(ctx)
|
|
fw_tenant_list = list(set(fw['tenant_id'] for fw in fw_list))
|
|
return fw_tenant_list
|
|
|
|
|
|
class FirewallAgentApi(object):
|
|
"""Plugin side of plugin to agent RPC API."""
|
|
|
|
def __init__(self, topic, host):
|
|
self.host = host
|
|
target = oslo_messaging.Target(topic=topic, version='1.0')
|
|
self.client = n_rpc.get_client(target)
|
|
|
|
def create_firewall(self, context, firewall):
|
|
cctxt = self.client.prepare(fanout=True)
|
|
cctxt.cast(context, 'create_firewall', firewall=firewall,
|
|
host=self.host)
|
|
|
|
def update_firewall(self, context, firewall):
|
|
cctxt = self.client.prepare(fanout=True)
|
|
cctxt.cast(context, 'update_firewall', firewall=firewall,
|
|
host=self.host)
|
|
|
|
def delete_firewall(self, context, firewall):
|
|
cctxt = self.client.prepare(fanout=True)
|
|
cctxt.cast(context, 'delete_firewall', firewall=firewall,
|
|
host=self.host)
|
|
|
|
|
|
class FirewallPlugin(
|
|
firewall_db.Firewall_db_mixin,
|
|
firewall_router_insertion_db.FirewallRouterInsertionDbMixin):
|
|
|
|
"""Implementation of the Neutron Firewall Service Plugin.
|
|
|
|
This class manages the workflow of FWaaS request/response.
|
|
Most DB related works are implemented in class
|
|
firewall_db.Firewall_db_mixin.
|
|
"""
|
|
supported_extension_aliases = ["fwaas", "fwaasrouterinsertion"]
|
|
path_prefix = fw_ext.FIREWALL_PREFIX
|
|
|
|
def __init__(self):
|
|
"""Do the initialization for the firewall service plugin here."""
|
|
self.start_rpc_listeners()
|
|
|
|
self.agent_rpc = FirewallAgentApi(
|
|
topics.L3_AGENT,
|
|
cfg.CONF.host
|
|
)
|
|
firewall_db.subscribe()
|
|
|
|
def start_rpc_listeners(self):
|
|
self.endpoints = [FirewallCallbacks(self)]
|
|
|
|
self.conn = n_rpc.create_connection(new=True)
|
|
self.conn.create_consumer(
|
|
topics.FIREWALL_PLUGIN, self.endpoints, fanout=False)
|
|
return self.conn.consume_in_threads()
|
|
|
|
def _rpc_update_firewall(self, context, firewall_id):
|
|
status_update = {"firewall": {"status": const.PENDING_UPDATE}}
|
|
super(FirewallPlugin, self).update_firewall(context, firewall_id,
|
|
status_update)
|
|
fw_with_rules = self._make_firewall_dict_with_rules(context,
|
|
firewall_id)
|
|
# this is triggered on an update to fw rule or policy, no
|
|
# change in associated routers.
|
|
fw_with_rules['add-router-ids'] = self.get_firewall_routers(
|
|
context, firewall_id)
|
|
fw_with_rules['del-router-ids'] = []
|
|
self.agent_rpc.update_firewall(context, fw_with_rules)
|
|
|
|
def _rpc_update_firewall_policy(self, context, firewall_policy_id):
|
|
firewall_policy = self.get_firewall_policy(context, firewall_policy_id)
|
|
if firewall_policy:
|
|
for firewall_id in firewall_policy['firewall_list']:
|
|
self._rpc_update_firewall(context, firewall_id)
|
|
|
|
def _ensure_update_firewall(self, context, firewall_id):
|
|
fwall = self.get_firewall(context, firewall_id)
|
|
if fwall['status'] in [const.PENDING_CREATE,
|
|
const.PENDING_UPDATE,
|
|
const.PENDING_DELETE]:
|
|
raise fw_ext.FirewallInPendingState(firewall_id=firewall_id,
|
|
pending_state=fwall['status'])
|
|
|
|
def _ensure_update_firewall_policy(self, context, firewall_policy_id):
|
|
firewall_policy = self.get_firewall_policy(context, firewall_policy_id)
|
|
if firewall_policy and 'firewall_list' in firewall_policy:
|
|
for firewall_id in firewall_policy['firewall_list']:
|
|
self._ensure_update_firewall(context, firewall_id)
|
|
|
|
def _ensure_update_firewall_rule(self, context, firewall_rule_id):
|
|
fw_rule = self.get_firewall_rule(context, firewall_rule_id)
|
|
if 'firewall_policy_id' in fw_rule and fw_rule['firewall_policy_id']:
|
|
self._ensure_update_firewall_policy(context,
|
|
fw_rule['firewall_policy_id'])
|
|
|
|
def _get_routers_for_create_firewall(self, tenant_id, context, firewall):
|
|
|
|
# pop router_id as this goes in the router association db
|
|
# and not firewall db
|
|
router_ids = firewall['firewall'].pop('router_ids', None)
|
|
if router_ids == attr.ATTR_NOT_SPECIFIED:
|
|
# old semantics router-ids keyword not specified pick up
|
|
# all routers on tenant.
|
|
l3_plugin = manager.NeutronManager.get_service_plugins().get(
|
|
const.L3_ROUTER_NAT)
|
|
ctx = neutron_context.get_admin_context()
|
|
routers = l3_plugin.get_routers(ctx)
|
|
router_ids = [
|
|
router['id']
|
|
for router in routers
|
|
if router['tenant_id'] == tenant_id]
|
|
# validation can still fail this if there is another fw
|
|
# which is associated with one of these routers.
|
|
self.validate_firewall_routers_not_in_use(context, router_ids)
|
|
return router_ids
|
|
else:
|
|
if not router_ids:
|
|
# This indicates that user specifies no routers.
|
|
return []
|
|
else:
|
|
# some router(s) provided.
|
|
self.validate_firewall_routers_not_in_use(context, router_ids)
|
|
return router_ids
|
|
|
|
def create_firewall(self, context, firewall):
|
|
LOG.debug("create_firewall() called")
|
|
tenant_id = self._get_tenant_id_for_create(context,
|
|
firewall['firewall'])
|
|
|
|
fw_new_rtrs = self._get_routers_for_create_firewall(
|
|
tenant_id, context, firewall)
|
|
|
|
if not fw_new_rtrs:
|
|
# no messaging to agent needed, and fw needs to go
|
|
# to INACTIVE(no associated rtrs) state.
|
|
status = const.INACTIVE
|
|
fw = super(FirewallPlugin, self).create_firewall(
|
|
context, firewall, status)
|
|
fw['router_ids'] = []
|
|
return fw
|
|
else:
|
|
fw = super(FirewallPlugin, self).create_firewall(
|
|
context, firewall)
|
|
fw['router_ids'] = fw_new_rtrs
|
|
|
|
fw_with_rules = (
|
|
self._make_firewall_dict_with_rules(context, fw['id']))
|
|
|
|
fw_with_rtrs = {'fw_id': fw['id'],
|
|
'router_ids': fw_new_rtrs}
|
|
self.set_routers_for_firewall(context, fw_with_rtrs)
|
|
fw_with_rules['add-router-ids'] = fw_new_rtrs
|
|
fw_with_rules['del-router-ids'] = []
|
|
|
|
self.agent_rpc.create_firewall(context, fw_with_rules)
|
|
|
|
return fw
|
|
|
|
def update_firewall(self, context, id, firewall):
|
|
LOG.debug("update_firewall() called on firewall %s", id)
|
|
|
|
self._ensure_update_firewall(context, id)
|
|
# pop router_id as this goes in the router association db
|
|
# and not firewall db
|
|
router_ids = firewall['firewall'].pop('router_ids', None)
|
|
fw_current_rtrs = self.get_firewall_routers(context, id)
|
|
if router_ids is not None:
|
|
if router_ids == []:
|
|
# This indicates that user is indicating no routers.
|
|
fw_new_rtrs = []
|
|
else:
|
|
self.validate_firewall_routers_not_in_use(
|
|
context, router_ids, id)
|
|
fw_new_rtrs = router_ids
|
|
self.update_firewall_routers(context, {'fw_id': id,
|
|
'router_ids': fw_new_rtrs})
|
|
else:
|
|
# router-ids keyword not specified for update pick up
|
|
# existing routers.
|
|
fw_new_rtrs = self.get_firewall_routers(context, id)
|
|
|
|
if not fw_new_rtrs and not fw_current_rtrs:
|
|
# no messaging to agent needed, and we need to continue
|
|
# in INACTIVE state
|
|
firewall['firewall']['status'] = const.INACTIVE
|
|
fw = super(FirewallPlugin, self).update_firewall(
|
|
context, id, firewall)
|
|
fw['router_ids'] = []
|
|
return fw
|
|
else:
|
|
firewall['firewall']['status'] = const.PENDING_UPDATE
|
|
fw = super(FirewallPlugin, self).update_firewall(
|
|
context, id, firewall)
|
|
fw['router_ids'] = fw_new_rtrs
|
|
|
|
fw_with_rules = (
|
|
self._make_firewall_dict_with_rules(context, fw['id']))
|
|
|
|
# determine rtrs to add fw to and del from
|
|
fw_with_rules['add-router-ids'] = fw_new_rtrs
|
|
fw_with_rules['del-router-ids'] = list(
|
|
set(fw_current_rtrs).difference(set(fw_new_rtrs)))
|
|
|
|
# last-router drives agent to ack with status to set state to INACTIVE
|
|
fw_with_rules['last-router'] = not fw_new_rtrs
|
|
|
|
LOG.debug("update_firewall %s: Add Routers: %s, Del Routers: %s",
|
|
fw['id'],
|
|
fw_with_rules['add-router-ids'],
|
|
fw_with_rules['del-router-ids'])
|
|
|
|
self.agent_rpc.update_firewall(context, fw_with_rules)
|
|
|
|
return fw
|
|
|
|
def delete_db_firewall_object(self, context, id):
|
|
super(FirewallPlugin, self).delete_firewall(context, id)
|
|
|
|
def delete_firewall(self, context, id):
|
|
LOG.debug("delete_firewall() called on firewall %s", id)
|
|
fw_with_rules = (
|
|
self._make_firewall_dict_with_rules(context, id))
|
|
fw_with_rules['del-router-ids'] = self.get_firewall_routers(
|
|
context, id)
|
|
fw_with_rules['add-router-ids'] = []
|
|
if not fw_with_rules['del-router-ids']:
|
|
# no routers to delete on the agent side
|
|
self.delete_db_firewall_object(context, id)
|
|
else:
|
|
status = {"firewall": {"status": const.PENDING_DELETE}}
|
|
super(FirewallPlugin, self).update_firewall(context, id, status)
|
|
# Reflect state change in fw_with_rules
|
|
fw_with_rules['status'] = status['firewall']['status']
|
|
self.agent_rpc.delete_firewall(context, fw_with_rules)
|
|
|
|
def update_firewall_policy(self, context, id, firewall_policy):
|
|
LOG.debug("update_firewall_policy() called")
|
|
self._ensure_update_firewall_policy(context, id)
|
|
fwp = super(FirewallPlugin,
|
|
self).update_firewall_policy(context, id, firewall_policy)
|
|
self._rpc_update_firewall_policy(context, id)
|
|
return fwp
|
|
|
|
def update_firewall_rule(self, context, id, firewall_rule):
|
|
LOG.debug("update_firewall_rule() called")
|
|
self._ensure_update_firewall_rule(context, id)
|
|
fwr = super(FirewallPlugin,
|
|
self).update_firewall_rule(context, id, firewall_rule)
|
|
firewall_policy_id = fwr['firewall_policy_id']
|
|
if firewall_policy_id:
|
|
self._rpc_update_firewall_policy(context, firewall_policy_id)
|
|
return fwr
|
|
|
|
def insert_rule(self, context, id, rule_info):
|
|
LOG.debug("insert_rule() called")
|
|
self._ensure_update_firewall_policy(context, id)
|
|
fwp = super(FirewallPlugin,
|
|
self).insert_rule(context, id, rule_info)
|
|
self._rpc_update_firewall_policy(context, id)
|
|
return fwp
|
|
|
|
def remove_rule(self, context, id, rule_info):
|
|
LOG.debug("remove_rule() called")
|
|
self._ensure_update_firewall_policy(context, id)
|
|
fwp = super(FirewallPlugin,
|
|
self).remove_rule(context, id, rule_info)
|
|
self._rpc_update_firewall_policy(context, id)
|
|
return fwp
|
|
|
|
def get_firewalls(self, context, filters=None, fields=None):
|
|
LOG.debug("fwaas get_firewalls() called")
|
|
fw_list = super(FirewallPlugin, self).get_firewalls(
|
|
context, filters, fields)
|
|
for fw in fw_list:
|
|
fw_current_rtrs = self.get_firewall_routers(context, fw['id'])
|
|
fw['router_ids'] = fw_current_rtrs
|
|
return fw_list
|
|
|
|
def get_firewall(self, context, id, fields=None):
|
|
LOG.debug("fwaas get_firewall() called")
|
|
res = super(FirewallPlugin, self).get_firewall(
|
|
context, id, fields)
|
|
fw_current_rtrs = self.get_firewall_routers(context, id)
|
|
res['router_ids'] = fw_current_rtrs
|
|
return res
|