vmware-nsx/vmware_nsx/services/vpnaas/nsxv/ipsec_validator.py

128 lines
5.5 KiB
Python

# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from neutron_vpnaas.db.vpn import vpn_validator
from oslo_log import log as logging
from vmware_nsx._i18n import _
from vmware_nsx.common import exceptions as nsxv_exc
from vmware_nsx.common import nsxv_constants
LOG = logging.getLogger(__name__)
class IPsecValidator(vpn_validator.VpnReferenceValidator):
"""Validator methods for Vmware VPN support"""
def __init__(self, service_plugin):
super(IPsecValidator, self).__init__()
self.vpn_plugin = service_plugin
def validate_ikepolicy_version(self, policy_info):
"""NSX Edge provides IKEv1"""
version = policy_info.get('ike_version')
if version != 'v1':
msg = _("Unsupported ike policy %s! only v1 "
"is supported right now.") % version
raise nsxv_exc.NsxVpnValidationError(details=msg)
def validate_ikepolicy_pfs(self, policy_info):
# Check whether pfs is allowed.
if not nsxv_constants.PFS_MAP.get(policy_info['pfs']):
msg = _("Unsupported pfs: %(pfs)s! currently only "
"the following pfs are supported on VSE: %s") % {
'pfs': policy_info['pfs'],
'supported': nsxv_constants.PFS_MAP}
raise nsxv_exc.NsxVpnValidationError(details=msg)
def validate_encryption_algorithm(self, policy_info):
encryption = policy_info['encryption_algorithm']
if encryption not in nsxv_constants.ENCRYPTION_ALGORITHM_MAP:
msg = _("Unsupported encryption_algorithm: %(algo)s! please "
"select one of the following supported algorithms: "
"%(supported_algos)s") % {
'algo': encryption,
'supported_algos':
nsxv_constants.ENCRYPTION_ALGORITHM_MAP}
raise nsxv_exc.NsxVpnValidationError(details=msg)
def validate_ipsec_policy(self, context, policy_info):
"""Ensure IPSec policy encap mode is tunnel for current REST API."""
mode = policy_info['encapsulation_mode']
if mode not in nsxv_constants.ENCAPSULATION_MODE_ALLOWED:
msg = _("Unsupported encapsulation mode: %s! currently only"
"'tunnel' mode is supported.") % mode
raise nsxv_exc.NsxVpnValidationError(details=msg)
def validate_policies_matching_algorithms(self, ikepolicy, ipsecpolicy):
# In VSE, Phase 1 and Phase 2 share the same encryption_algorithm
# and authentication algorithms setting. At present, just record the
# discrepancy error in log and take ipsecpolicy to do configuration.
keys = ('auth_algorithm', 'encryption_algorithm', 'pfs')
for key in keys:
if ikepolicy[key] != ipsecpolicy[key]:
LOG.warning("IKEPolicy and IPsecPolicy should have consistent "
"auth_algorithm, encryption_algorithm and pfs for "
"VSE!")
break
def _is_shared_router(self, router):
return router.get('router_type') == constants.SHARED
def _validate_router(self, context, router_id):
# Only support distributed and exclusive router type
router = self.core_plugin.get_router(context, router_id)
if self._is_shared_router(router):
msg = _("Router type is not supported for VPN service, only "
"support distributed and exclusive router")
raise nsxv_exc.NsxVpnValidationError(details=msg)
def validate_vpnservice(self, context, vpnservice):
"""Called upon create/update of a service"""
# Call general validations
super(IPsecValidator, self).validate_vpnservice(
context, vpnservice)
# Call specific NSX validations
self._validate_router(context, vpnservice['router_id'])
if not vpnservice['subnet_id']:
# we currently do not support multiple subnets so a subnet must
# be defined
msg = _("Subnet must be defined in a service")
raise nsxv_exc.NsxVpnValidationError(details=msg)
def validate_ipsec_site_connection(self, context, ipsec_site_conn):
ike_policy_id = ipsec_site_conn.get('ikepolicy_id')
if ike_policy_id:
ikepolicy = self.vpn_plugin.get_ikepolicy(context,
ike_policy_id)
self.validate_ikepolicy_version(ikepolicy)
self.validate_ikepolicy_pfs(ikepolicy)
self.validate_encryption_algorithm(ikepolicy)
ipsec_policy_id = ipsec_site_conn.get('ipsecpolicy_id')
if ipsec_policy_id:
ipsecpolicy = self.vpn_plugin.get_ipsecpolicy(context,
ipsec_policy_id)
self.validate_ipsec_policy(context, ipsecpolicy)
if ike_policy_id and ipsec_policy_id:
self.validate_policies_matching_algorithms(ikepolicy, ipsecpolicy)