neutron/neutron/db/l3_hamode_db.py

631 lines
27 KiB
Python

# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import netaddr
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_utils import excutils
import sqlalchemy as sa
from sqlalchemy import orm
from neutron._i18n import _, _LI
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import utils as n_utils
from neutron.db import agents_db
from neutron.db.availability_zone import router as router_az_db
from neutron.db import l3_attrs_db
from neutron.db import l3_db
from neutron.db import l3_dvr_db
from neutron.db import model_base
from neutron.db import models_v2
from neutron.extensions import l3_ext_ha_mode as l3_ha
from neutron.extensions import portbindings
from neutron.extensions import providernet
from neutron.plugins.common import utils as p_utils
VR_ID_RANGE = set(range(1, 255))
MAX_ALLOCATION_TRIES = 10
UNLIMITED_AGENTS_PER_ROUTER = 0
LOG = logging.getLogger(__name__)
L3_HA_OPTS = [
cfg.BoolOpt('l3_ha',
default=False,
help=_('Enable HA mode for virtual routers.')),
cfg.IntOpt('max_l3_agents_per_router',
default=3,
help=_("Maximum number of L3 agents which a HA router will be "
"scheduled on. If it is set to 0 then the router will "
"be scheduled on every agent.")),
cfg.IntOpt('min_l3_agents_per_router',
default=constants.MINIMUM_AGENTS_FOR_HA,
help=_("Minimum number of L3 agents which a HA router will be "
"scheduled on. If it is set to 0 then the router will "
"be scheduled on every agent.")),
cfg.StrOpt('l3_ha_net_cidr',
default='169.254.192.0/18',
help=_('Subnet used for the l3 HA admin network.')),
cfg.StrOpt('l3_ha_network_type', default='',
help=_("The network type to use when creating the HA network "
"for an HA router. By default or if empty, the first "
"'tenant_network_types' is used. This is helpful when "
"the VRRP traffic should use a specific network which "
"is not the default one.")),
cfg.StrOpt('l3_ha_network_physical_name', default='',
help=_("The physical network name with which the HA network "
"can be created."))
]
cfg.CONF.register_opts(L3_HA_OPTS)
class L3HARouterAgentPortBinding(model_base.BASEV2):
"""Represent agent binding state of a HA router port.
A HA Router has one HA port per agent on which it is spawned.
This binding table stores which port is used for a HA router by a
L3 agent.
"""
__tablename__ = 'ha_router_agent_port_bindings'
port_id = sa.Column(sa.String(36), sa.ForeignKey('ports.id',
ondelete='CASCADE'),
nullable=False, primary_key=True)
port = orm.relationship(models_v2.Port)
router_id = sa.Column(sa.String(36), sa.ForeignKey('routers.id',
ondelete='CASCADE'),
nullable=False)
l3_agent_id = sa.Column(sa.String(36),
sa.ForeignKey("agents.id",
ondelete='CASCADE'))
agent = orm.relationship(agents_db.Agent)
state = sa.Column(sa.Enum(constants.HA_ROUTER_STATE_ACTIVE,
constants.HA_ROUTER_STATE_STANDBY,
name='l3_ha_states'),
default=constants.HA_ROUTER_STATE_STANDBY,
server_default=constants.HA_ROUTER_STATE_STANDBY)
class L3HARouterNetwork(model_base.BASEV2):
"""Host HA network for a tenant.
One HA Network is used per tenant, all HA router ports are created
on this network.
"""
__tablename__ = 'ha_router_networks'
tenant_id = sa.Column(sa.String(attributes.TENANT_ID_MAX_LEN),
primary_key=True, nullable=False)
network_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id', ondelete="CASCADE"),
nullable=False, primary_key=True)
network = orm.relationship(models_v2.Network)
class L3HARouterVRIdAllocation(model_base.BASEV2):
"""VRID allocation per HA network.
Keep a track of the VRID allocations per HA network.
"""
__tablename__ = 'ha_router_vrid_allocations'
network_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id', ondelete="CASCADE"),
nullable=False, primary_key=True)
vr_id = sa.Column(sa.Integer(), nullable=False, primary_key=True)
class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
router_az_db.RouterAvailabilityZoneMixin):
"""Mixin class to add high availability capability to routers."""
extra_attributes = (
l3_dvr_db.L3_NAT_with_dvr_db_mixin.extra_attributes +
router_az_db.RouterAvailabilityZoneMixin.extra_attributes + [
{'name': 'ha', 'default': cfg.CONF.l3_ha},
{'name': 'ha_vr_id', 'default': 0}])
def _verify_configuration(self):
self.ha_cidr = cfg.CONF.l3_ha_net_cidr
try:
net = netaddr.IPNetwork(self.ha_cidr)
except netaddr.AddrFormatError:
raise l3_ha.HANetworkCIDRNotValid(cidr=self.ha_cidr)
if ('/' not in self.ha_cidr or net.network != net.ip):
raise l3_ha.HANetworkCIDRNotValid(cidr=self.ha_cidr)
self._check_num_agents_per_router()
def _check_num_agents_per_router(self):
max_agents = cfg.CONF.max_l3_agents_per_router
min_agents = cfg.CONF.min_l3_agents_per_router
if (max_agents != UNLIMITED_AGENTS_PER_ROUTER
and max_agents < min_agents):
raise l3_ha.HAMaximumAgentsNumberNotValid(
max_agents=max_agents, min_agents=min_agents)
if min_agents < constants.MINIMUM_AGENTS_FOR_HA:
raise l3_ha.HAMinimumAgentsNumberNotValid()
def __init__(self):
self._verify_configuration()
super(L3_HA_NAT_db_mixin, self).__init__()
def get_ha_network(self, context, tenant_id):
return (context.session.query(L3HARouterNetwork).
filter(L3HARouterNetwork.tenant_id == tenant_id).
first())
def _get_allocated_vr_id(self, context, network_id):
with context.session.begin(subtransactions=True):
query = (context.session.query(L3HARouterVRIdAllocation).
filter(L3HARouterVRIdAllocation.network_id == network_id))
allocated_vr_ids = set(a.vr_id for a in query) - set([0])
return allocated_vr_ids
def _allocate_vr_id(self, context, network_id, router_id):
for count in range(MAX_ALLOCATION_TRIES):
try:
with context.session.begin(subtransactions=True):
allocated_vr_ids = self._get_allocated_vr_id(context,
network_id)
available_vr_ids = VR_ID_RANGE - allocated_vr_ids
if not available_vr_ids:
raise l3_ha.NoVRIDAvailable(router_id=router_id)
allocation = L3HARouterVRIdAllocation()
allocation.network_id = network_id
allocation.vr_id = available_vr_ids.pop()
context.session.add(allocation)
return allocation.vr_id
except db_exc.DBDuplicateEntry:
LOG.info(_LI("Attempt %(count)s to allocate a VRID in the "
"network %(network)s for the router %(router)s"),
{'count': count, 'network': network_id,
'router': router_id})
raise l3_ha.MaxVRIDAllocationTriesReached(
network_id=network_id, router_id=router_id,
max_tries=MAX_ALLOCATION_TRIES)
def _delete_vr_id_allocation(self, context, ha_network, vr_id):
with context.session.begin(subtransactions=True):
context.session.query(L3HARouterVRIdAllocation).filter_by(
network_id=ha_network.network_id,
vr_id=vr_id).delete()
def _set_vr_id(self, context, router, ha_network):
with context.session.begin(subtransactions=True):
router.extra_attributes.ha_vr_id = self._allocate_vr_id(
context, ha_network.network_id, router.id)
def _create_ha_subnet(self, context, network_id, tenant_id):
args = {'network_id': network_id,
'tenant_id': '',
'name': constants.HA_SUBNET_NAME % tenant_id,
'ip_version': 4,
'cidr': cfg.CONF.l3_ha_net_cidr,
'enable_dhcp': False,
'gateway_ip': None}
return p_utils.create_subnet(self._core_plugin, context,
{'subnet': args})
def _create_ha_network_tenant_binding(self, context, tenant_id,
network_id):
with context.session.begin(subtransactions=True):
ha_network = L3HARouterNetwork(tenant_id=tenant_id,
network_id=network_id)
context.session.add(ha_network)
return ha_network
def _add_ha_network_settings(self, network):
if cfg.CONF.l3_ha_network_type:
network[providernet.NETWORK_TYPE] = cfg.CONF.l3_ha_network_type
if cfg.CONF.l3_ha_network_physical_name:
network[providernet.PHYSICAL_NETWORK] = (
cfg.CONF.l3_ha_network_physical_name)
def _create_ha_network(self, context, tenant_id):
admin_ctx = context.elevated()
args = {'network':
{'name': constants.HA_NETWORK_NAME % tenant_id,
'tenant_id': '',
'shared': False,
'admin_state_up': True}}
self._add_ha_network_settings(args['network'])
network = p_utils.create_network(self._core_plugin, admin_ctx, args)
try:
ha_network = self._create_ha_network_tenant_binding(admin_ctx,
tenant_id,
network['id'])
except Exception:
with excutils.save_and_reraise_exception():
self._core_plugin.delete_network(admin_ctx, network['id'])
try:
self._create_ha_subnet(admin_ctx, network['id'], tenant_id)
except Exception:
with excutils.save_and_reraise_exception():
self._core_plugin.delete_network(admin_ctx, network['id'])
return ha_network
def get_number_of_agents_for_scheduling(self, context):
"""Return the number of agents on which the router will be scheduled.
Raises an exception if there are not enough agents available to honor
the min_agents config parameter. If the max_agents parameter is set to
0 all the agents will be used.
"""
min_agents = cfg.CONF.min_l3_agents_per_router
num_agents = len(self.get_l3_agents(context, active=True,
filters={'agent_modes': [constants.L3_AGENT_MODE_LEGACY,
constants.L3_AGENT_MODE_DVR_SNAT]}))
max_agents = cfg.CONF.max_l3_agents_per_router
if max_agents:
if max_agents > num_agents:
LOG.info(_LI("Number of active agents lower than "
"max_l3_agents_per_router. L3 agents "
"available: %s"), num_agents)
else:
num_agents = max_agents
if num_agents < min_agents:
raise l3_ha.HANotEnoughAvailableAgents(min_agents=min_agents,
num_agents=num_agents)
return num_agents
def _create_ha_port_binding(self, context, port_id, router_id):
with context.session.begin(subtransactions=True):
portbinding = L3HARouterAgentPortBinding(port_id=port_id,
router_id=router_id)
context.session.add(portbinding)
return portbinding
def add_ha_port(self, context, router_id, network_id, tenant_id):
args = {'tenant_id': '',
'network_id': network_id,
'admin_state_up': True,
'device_id': router_id,
'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
'name': constants.HA_PORT_NAME % tenant_id}
port = p_utils.create_port(self._core_plugin, context,
{'port': args})
try:
return self._create_ha_port_binding(context, port['id'], router_id)
except Exception:
with excutils.save_and_reraise_exception():
self._core_plugin.delete_port(context, port['id'],
l3_port_check=False)
def _create_ha_interfaces(self, context, router, ha_network):
admin_ctx = context.elevated()
num_agents = self.get_number_of_agents_for_scheduling(context)
port_ids = []
try:
for index in range(num_agents):
binding = self.add_ha_port(admin_ctx, router.id,
ha_network.network['id'],
router.tenant_id)
port_ids.append(binding.port_id)
except Exception:
with excutils.save_and_reraise_exception():
for port_id in port_ids:
self._core_plugin.delete_port(admin_ctx, port_id,
l3_port_check=False)
def _delete_ha_interfaces(self, context, router_id):
admin_ctx = context.elevated()
device_filter = {'device_id': [router_id],
'device_owner':
[constants.DEVICE_OWNER_ROUTER_HA_INTF]}
ports = self._core_plugin.get_ports(admin_ctx, filters=device_filter)
for port in ports:
self._core_plugin.delete_port(admin_ctx, port['id'],
l3_port_check=False)
def delete_ha_interfaces_on_host(self, context, router_id, host):
admin_ctx = context.elevated()
port_ids = (binding.port_id for binding
in self.get_ha_router_port_bindings(admin_ctx,
[router_id], host))
for port_id in port_ids:
self._core_plugin.delete_port(admin_ctx, port_id,
l3_port_check=False)
def _notify_ha_interfaces_updated(self, context, router_id):
self.l3_rpc_notifier.routers_updated(
context, [router_id], shuffle_agents=True)
@classmethod
def _is_ha(cls, router):
ha = router.get('ha')
if not attributes.is_attr_set(ha):
ha = cfg.CONF.l3_ha
return ha
def create_router(self, context, router):
is_ha = self._is_ha(router['router'])
if is_ha and l3_dvr_db.is_distributed_router(router['router']):
raise l3_ha.DistributedHARouterNotSupported()
router['router']['ha'] = is_ha
router_dict = super(L3_HA_NAT_db_mixin,
self).create_router(context, router)
if is_ha:
try:
router_db = self._get_router(context, router_dict['id'])
ha_network = self.get_ha_network(context,
router_db.tenant_id)
if not ha_network:
ha_network = self._create_ha_network(context,
router_db.tenant_id)
self._set_vr_id(context, router_db, ha_network)
self._create_ha_interfaces(context, router_db, ha_network)
self._notify_ha_interfaces_updated(context, router_db.id)
except Exception:
with excutils.save_and_reraise_exception():
self.delete_router(context, router_dict['id'])
router_dict['ha_vr_id'] = router_db.extra_attributes.ha_vr_id
return router_dict
def _update_router_db(self, context, router_id, data):
router_db = self._get_router(context, router_id)
original_distributed_state = router_db.extra_attributes.distributed
original_ha_state = router_db.extra_attributes.ha
requested_ha_state = data.pop('ha', None)
requested_distributed_state = data.get('distributed', None)
if ((original_ha_state and requested_distributed_state) or
(requested_ha_state and original_distributed_state) or
(requested_ha_state and requested_distributed_state)):
raise l3_ha.DistributedHARouterNotSupported()
with context.session.begin(subtransactions=True):
router_db = super(L3_HA_NAT_db_mixin, self)._update_router_db(
context, router_id, data)
ha_not_changed = (requested_ha_state is None or
requested_ha_state == original_ha_state)
if ha_not_changed:
return router_db
if router_db.admin_state_up:
msg = _('Cannot change HA attribute of active routers. Please '
'set router admin_state_up to False prior to upgrade.')
raise n_exc.BadRequest(resource='router', msg=msg)
ha_network = self.get_ha_network(context,
router_db.tenant_id)
router_db.extra_attributes.ha = requested_ha_state
if not requested_ha_state:
self._delete_vr_id_allocation(
context, ha_network, router_db.extra_attributes.ha_vr_id)
router_db.extra_attributes.ha_vr_id = None
# The HA attribute has changed. First unbind the router from agents
# to force a proper re-scheduling to agents.
# TODO(jschwarz): This will have to be more selective to get HA + DVR
# working (Only unbind from dvr_snat nodes).
self._unbind_ha_router(context, router_id)
if requested_ha_state:
if not ha_network:
ha_network = self._create_ha_network(context,
router_db.tenant_id)
self._set_vr_id(context, router_db, ha_network)
self._create_ha_interfaces(context, router_db, ha_network)
self._notify_ha_interfaces_updated(context, router_db.id)
else:
self._delete_ha_interfaces(context, router_db.id)
self._notify_ha_interfaces_updated(context, router_db.id)
return router_db
def _delete_ha_network(self, context, net):
admin_ctx = context.elevated()
self._core_plugin.delete_network(admin_ctx, net.network_id)
def _ha_routers_present(self, context, tenant_id):
ha = True
routers = context.session.query(l3_db.Router).filter(
l3_db.Router.tenant_id == tenant_id).subquery()
ha_routers = context.session.query(
l3_attrs_db.RouterExtraAttributes).join(
routers,
l3_attrs_db.RouterExtraAttributes.router_id == routers.c.id
).filter(l3_attrs_db.RouterExtraAttributes.ha == ha).first()
return ha_routers is not None
def delete_router(self, context, id):
router_db = self._get_router(context, id)
super(L3_HA_NAT_db_mixin, self).delete_router(context, id)
if router_db.extra_attributes.ha:
ha_network = self.get_ha_network(context,
router_db.tenant_id)
if ha_network:
self._delete_vr_id_allocation(
context, ha_network, router_db.extra_attributes.ha_vr_id)
self._delete_ha_interfaces(context, router_db.id)
# In case that create HA router failed because of the failure
# in HA network creation. So here put this deleting HA network
# procedure under 'if ha_network' block.
if not self._ha_routers_present(context,
router_db.tenant_id):
try:
self._delete_ha_network(context, ha_network)
except (n_exc.NetworkNotFound,
orm.exc.ObjectDeletedError):
LOG.debug(
"HA network for tenant %s was already deleted.",
router_db.tenant_id)
except sa.exc.InvalidRequestError:
LOG.info(_LI("HA network %s can not be deleted."),
ha_network.network_id)
except n_exc.NetworkInUse:
LOG.debug("HA network %s is still in use.",
ha_network.network_id)
else:
LOG.info(_LI("HA network %(network)s was deleted as "
"no HA routers are present in tenant "
"%(tenant)s."),
{'network': ha_network.network_id,
'tenant': router_db.tenant_id})
def _unbind_ha_router(self, context, router_id):
for agent in self.get_l3_agents_hosting_routers(context, [router_id]):
self.remove_router_from_l3_agent(context, agent['id'], router_id)
def get_ha_router_port_bindings(self, context, router_ids, host=None):
if not router_ids:
return []
query = context.session.query(L3HARouterAgentPortBinding)
if host:
query = query.join(agents_db.Agent).filter(
agents_db.Agent.host == host)
query = query.filter(
L3HARouterAgentPortBinding.router_id.in_(router_ids))
return query.all()
def get_l3_bindings_hosting_router_with_ha_states(
self, context, router_id):
"""Return a list of [(agent, ha_state), ...]."""
bindings = self.get_ha_router_port_bindings(context, [router_id])
return [(binding.agent, binding.state) for binding in bindings
if binding.agent is not None]
def get_active_host_for_ha_router(self, context, router_id):
bindings = self.get_l3_bindings_hosting_router_with_ha_states(
context, router_id)
# TODO(amuller): In case we have two or more actives, this method
# needs to return the last agent to become active. This requires
# timestamps for state changes. Otherwise, if a host goes down
# and another takes over, we'll have two actives. In this case,
# if an interface is added to a router, its binding might be wrong
# and l2pop would not work correctly.
return next(
(agent.host for agent, state in bindings
if state == constants.HA_ROUTER_STATE_ACTIVE),
None)
def _process_sync_ha_data(self, context, routers, host):
routers_dict = dict((router['id'], router) for router in routers)
bindings = self.get_ha_router_port_bindings(context,
routers_dict.keys(),
host)
for binding in bindings:
port_dict = self._core_plugin._make_port_dict(binding.port)
router = routers_dict.get(binding.router_id)
router[constants.HA_INTERFACE_KEY] = port_dict
router[constants.HA_ROUTER_STATE_KEY] = binding.state
for router in routers_dict.values():
interface = router.get(constants.HA_INTERFACE_KEY)
if interface:
self._populate_subnets_for_ports(context, [interface])
return list(routers_dict.values())
def get_ha_sync_data_for_host(self, context, host=None, router_ids=None,
active=None):
if n_utils.is_extension_supported(self,
constants.L3_DISTRIBUTED_EXT_ALIAS):
# DVR has to be handled differently
agent = self._get_agent_by_type_and_host(context,
constants.AGENT_TYPE_L3,
host)
sync_data = self._get_dvr_sync_data(context, host, agent,
router_ids, active)
else:
sync_data = super(L3_HA_NAT_db_mixin, self).get_sync_data(context,
router_ids, active)
return self._process_sync_ha_data(context, sync_data, host)
@classmethod
def _set_router_states(cls, context, bindings, states):
for binding in bindings:
try:
with context.session.begin(subtransactions=True):
binding.state = states[binding.router_id]
except (orm.exc.StaleDataError, orm.exc.ObjectDeletedError):
# Take concurrently deleted routers in to account
pass
def update_routers_states(self, context, states, host):
"""Receive dict of router ID to state and update them all."""
bindings = self.get_ha_router_port_bindings(
context, router_ids=states.keys(), host=host)
self._set_router_states(context, bindings, states)
self._update_router_port_bindings(context, states, host)
def _update_router_port_bindings(self, context, states, host):
admin_ctx = context.elevated()
device_filter = {'device_id': states.keys(),
'device_owner':
[constants.DEVICE_OWNER_ROUTER_INTF]}
ports = self._core_plugin.get_ports(admin_ctx, filters=device_filter)
active_ports = (port for port in ports
if states[port['device_id']] == constants.HA_ROUTER_STATE_ACTIVE)
for port in active_ports:
port[portbindings.HOST_ID] = host
try:
self._core_plugin.update_port(admin_ctx, port['id'],
{attributes.PORT: port})
except (orm.exc.StaleDataError, orm.exc.ObjectDeletedError):
# Take concurrently deleted interfaces in to account
pass