vmware-nsx/vmware_nsx/plugins/nsx_v/vshield/securitygroup_utils.py

246 lines
9.6 KiB
Python

# Copyright 2014 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import xml.etree.ElementTree as et
from oslo_config import cfg
from oslo_log import log as logging
from vmware_nsx.common import utils
WAIT_INTERVAL = 2000
MAX_ATTEMPTS = 5
LOG = logging.getLogger(__name__)
class NsxSecurityGroupUtils(object):
def __init__(self, nsxv_manager):
LOG.debug("Start Security Group Utils initialization")
self.nsxv_manager = nsxv_manager
def to_xml_string(self, element):
return et.tostring(element)
def get_section_with_rules(self, name, rules, section_id=None):
"""Helper method to create section dict with rules."""
section = et.Element('section')
section.attrib['name'] = name
if section_id:
section.attrib['id'] = section_id
for rule in rules:
section.append(rule)
return section
def get_container(self, nsx_sg_id):
container = {'type': 'SecurityGroup', 'value': nsx_sg_id}
return container
def get_remote_container(self, remote_group_id, remote_ip_mac):
container = None
if remote_group_id is not None:
return self.get_container(remote_group_id)
if remote_ip_mac is not None:
container = {'type': 'Ipv4Address', 'value': remote_ip_mac}
return container
def get_rule_config(self, applied_to_ids, name, action='allow',
applied_to='SecurityGroup',
source=None, destination=None, services=None,
flags=None, logged=False, tag=None,
application_services=None, notes=None):
"""Helper method to create a nsx rule dict."""
ruleTag = et.Element('rule')
ruleTag.attrib['logged'] = 'true' if logged else 'false'
nameTag = et.SubElement(ruleTag, 'name')
nameTag.text = name
actionTag = et.SubElement(ruleTag, 'action')
actionTag.text = action
notesTag = et.SubElement(ruleTag, 'notes')
notesTag.text = notes
apList = et.SubElement(ruleTag, 'appliedToList')
for applied_to_id in applied_to_ids:
apTag = et.SubElement(apList, 'appliedTo')
apTypeTag = et.SubElement(apTag, 'type')
apTypeTag.text = applied_to
apValueTag = et.SubElement(apTag, 'value')
apValueTag.text = applied_to_id
if source is not None:
sources = et.SubElement(ruleTag, 'sources')
sources.attrib['excluded'] = 'false'
srcTag = et.SubElement(sources, 'source')
srcTypeTag = et.SubElement(srcTag, 'type')
srcTypeTag.text = source['type']
srcValueTag = et.SubElement(srcTag, 'value')
srcValueTag.text = source['value']
if destination is not None:
dests = et.SubElement(ruleTag, 'destinations')
dests.attrib['excluded'] = 'false'
destTag = et.SubElement(dests, 'destination')
destTypeTag = et.SubElement(destTag, 'type')
destTypeTag.text = destination['type']
destValueTag = et.SubElement(destTag, 'value')
destValueTag.text = destination['value']
if services:
s = et.SubElement(ruleTag, 'services')
for protocol, port, icmptype, icmpcode in services:
svcTag = et.SubElement(s, 'service')
try:
int(protocol)
svcProtocolTag = et.SubElement(svcTag, 'protocol')
svcProtocolTag.text = str(protocol)
except ValueError:
svcProtocolTag = et.SubElement(svcTag, 'protocolName')
svcProtocolTag.text = protocol
if port is not None:
svcPortTag = et.SubElement(svcTag, 'destinationPort')
svcPortTag.text = str(port)
if icmptype is not None:
svcPortTag = et.SubElement(svcTag, 'subProtocol')
svcPortTag.text = str(icmptype)
if icmpcode is not None:
if icmptype in ('0', '8') and icmpcode == '0':
# icmpcode 0 should not be sent
# TODO(asarfaty): Validate if this is needed for all
# NSX versions and all icmp types
pass
else:
svcPortTag = et.SubElement(svcTag, 'icmpCode')
svcPortTag.text = str(icmpcode)
if application_services:
s = et.SubElement(ruleTag, 'services')
for application_service in application_services:
svcTag = et.SubElement(s, 'service')
svcProtocolTag = et.SubElement(svcTag, 'value')
svcProtocolTag.text = str(application_service)
if flags:
if flags.get('ethertype') is not None:
pktTag = et.SubElement(ruleTag, 'packetType')
pktTag.text = flags.get('ethertype')
if flags.get('direction') is not None:
dirTag = et.SubElement(ruleTag, 'direction')
dirTag.text = flags.get('direction')
if tag:
tagTag = et.SubElement(ruleTag, 'tag')
tagTag.text = tag
return ruleTag
def get_rule_id_pair_from_section(self, resp):
root = et.fromstring(resp)
pairs = []
for rule in root.findall('rule'):
pair = {'nsx_id': rule.attrib.get('id'),
'neutron_id': rule.find('name').text}
pairs.append(pair)
return pairs
def fix_existing_section_rules(self, section):
# fix section existing rules before extending it with new rules
# TODO(asarfaty): Validate if this is needed for all NSX versions
for rule in section.iter('rule'):
services = rule.find('services')
if services:
for service in services:
subProt = service.find('subProtocolName')
icmpCode = service.find('icmpCode')
if (icmpCode is not None and icmpCode.text == '0' and
subProt is not None and
subProt.text in ('echo-request', 'echo-reply')):
# ICMP code should not exist in the payload
service.remove(icmpCode)
def extend_section_with_rules(self, section, nsx_rules):
section.extend(nsx_rules)
def parse_section(self, xml_string):
return et.fromstring(xml_string)
def get_nsx_sg_name(self, sg_data):
try:
return cfg.CONF.nsxv.nsx_sg_name_format % sg_data
except Exception as e:
# Illegal format:
LOG.error("get_nsx_sg_name failed due to invalid format %s: %s",
cfg.CONF.nsxv.nsx_sg_name_format, e)
return '%(name)s (%(id)s)' % sg_data
def get_nsx_section_name(self, sg_data):
return 'SG Section: %s' % self.get_nsx_sg_name(sg_data)
def parse_and_get_section_id(self, section_xml):
section = et.fromstring(section_xml)
return section.attrib['id']
def is_section_logged(self, section):
# Determine if this section rules are being logged by the first rule
# 'logged' value.
rule = section.find('rule')
if rule is not None:
return rule.attrib.get('logged') == 'true'
return False
def set_rules_logged_option(self, section, logged):
value = 'true' if logged else 'false'
rules = section.findall('rule')
updated = False
for rule in rules:
if rule.attrib['logged'] != value:
rule.attrib['logged'] = value
updated = True
return updated
def del_nsx_security_group_from_policy(self, policy_id, sg_id):
if not policy_id:
return
policy = self.nsxv_manager.vcns.get_security_policy(policy_id)
policy = utils.normalize_xml(policy)
# check if the security group is already bounded to the policy
for binding in policy.iter('securityGroupBinding'):
if binding.find('objectId').text == sg_id:
# delete this entry
policy.remove(binding)
return self.nsxv_manager.vcns.update_security_policy(
policy_id, et.tostring(policy))
def add_nsx_security_group_to_policy(self, policy_id, sg_id):
if not policy_id:
return
# Get the policy configuration
policy = self.nsxv_manager.vcns.get_security_policy(policy_id)
policy = utils.normalize_xml(policy)
# check if the security group is already bounded to the policy
for binding in policy.iter('securityGroupBinding'):
if binding.find('objectId').text == sg_id:
# Already there
return
# Add a new binding entry
new_binding = et.SubElement(policy, 'securityGroupBinding')
et.SubElement(new_binding, 'objectId').text = sg_id
return self.nsxv_manager.vcns.update_security_policy(
policy_id, et.tostring(policy))