Hyper-V Agent decomposition

Decomposes Hyper-V Neutron Agent and adds the requirements.txt
file to set the dependency to the vendor agent.

Adds README containing information on how to properly use the
Hyper-V Neutron Agent and the ML2 Mechanism Driver.

Vendor plugin code is available on stackforge:
https://github.com/stackforge/networking-hyperv

Plugin already available in pypi:
https://pypi.python.org/pypi/networking-hyperv

DocImpact

Partially-implements: blueprint core-vendor-decomposition

Change-Id: Iedff2718732c884c297cb0be855593057bd64c38
This commit is contained in:
Claudiu Belu 2015-02-02 23:09:33 +02:00
parent 9ac4c3ad93
commit f39d96d3af
20 changed files with 260 additions and 2434 deletions

View File

@ -410,7 +410,7 @@ will be removed. The following aspects are captured:
+-------------------------------+-----------------------+-----------+------------------+---------+--------------+
| networking-cisco_ | core,ml2,l3,fw,vpn | yes | yes | [B] | |
+-------------------------------+-----------------------+-----------+------------------+---------+--------------+
| networking-hyperv_ | | | | | |
| networking-hyperv_ | ml2 | yes | yes | [C] | Kilo |
+-------------------------------+-----------------------+-----------+------------------+---------+--------------+
| networking-ibm_ | ml2,l3 | yes | no | [B] | Kilo |
+-------------------------------+-----------------------+-----------+------------------+---------+--------------+
@ -472,6 +472,12 @@ Cisco
.. _networking-hyperv:
Hyper-V
-------
* Git: https://github.com/stackforge/networking-hyperv
* Launchpad: https://launchpad.net/networking-hyperv
* PyPi: https://pypi.python.org/pypi/networking-hyperv
.. _networking-ibm:

View File

@ -0,0 +1,44 @@
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.common import config
from neutron.common import config as common_config
from neutron.i18n import _LI
from neutron.plugins.hyperv.agent import config as hyperv_config
from neutron.plugins.hyperv.agent import l2_agent
LOG = logging.getLogger(__name__)
def register_options():
config.register_agent_state_opts_helper(cfg.CONF)
cfg.CONF.register_opts(hyperv_config.HYPERV_AGENT_OPTS, "AGENT")
def main():
register_options()
common_config.init(sys.argv[1:])
config.setup_logging()
hyperv_agent = l2_agent.HyperVNeutronAgent()
# Start everything.
LOG.info(_LI("Agent initialized successfully, now running... "))
hyperv_agent.daemon_loop()

View File

@ -0,0 +1,47 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
HYPERV_AGENT_OPTS = [
cfg.ListOpt(
'physical_network_vswitch_mappings',
default=[],
help=_('List of <physical_network>:<vswitch> '
'where the physical networks can be expressed with '
'wildcards, e.g.: ."*:external"')),
cfg.StrOpt(
'local_network_vswitch',
default='private',
help=_('Private vswitch name used for local networks')),
cfg.IntOpt('polling_interval', default=2,
help=_("The number of seconds the agent will wait between "
"polling for local device changes.")),
cfg.BoolOpt('enable_metrics_collection',
default=False,
help=_('Enables metrics collections for switch ports by using '
'Hyper-V\'s metric APIs. Collected data can by '
'retrieved by other apps and services, e.g.: '
'Ceilometer. Requires Hyper-V / Windows Server 2012 '
'and above')),
cfg.IntOpt('metrics_max_retries',
default=100,
help=_('Specifies the maximum number of retries to enable '
'Hyper-V\'s port metrics collection. The agent will try '
'to enable the feature once every polling_interval '
'period for at most metrics_max_retries or until it '
'succeedes.'))
]

View File

@ -1,464 +0,0 @@
#Copyright 2013 Cloudbase Solutions SRL
#Copyright 2013 Pedro Navarro Perez
#All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import platform
import re
import sys
import time
import eventlet
eventlet.monkey_patch()
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from neutron.agent.common import config
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants as n_const
from neutron.common import topics
from neutron import context
from neutron.i18n import _LE, _LI
from neutron.openstack.common import loopingcall
from neutron.plugins.common import constants as p_const
from neutron.plugins.hyperv.agent import utils
from neutron.plugins.hyperv.agent import utilsfactory
from neutron.plugins.hyperv.common import constants
LOG = logging.getLogger(__name__)
agent_opts = [
cfg.ListOpt(
'physical_network_vswitch_mappings',
default=[],
help=_('List of <physical_network>:<vswitch> '
'where the physical networks can be expressed with '
'wildcards, e.g.: ."*:external"')),
cfg.StrOpt(
'local_network_vswitch',
default='private',
help=_('Private vswitch name used for local networks')),
cfg.IntOpt('polling_interval', default=2,
help=_("The number of seconds the agent will wait between "
"polling for local device changes.")),
cfg.BoolOpt('enable_metrics_collection',
default=False,
help=_('Enables metrics collections for switch ports by using '
'Hyper-V\'s metric APIs. Collected data can by '
'retrieved by other apps and services, e.g.: '
'Ceilometer. Requires Hyper-V / Windows Server 2012 '
'and above')),
cfg.IntOpt('metrics_max_retries',
default=100,
help=_('Specifies the maximum number of retries to enable '
'Hyper-V\'s port metrics collection. The agent will try '
'to enable the feature once every polling_interval '
'period for at most metrics_max_retries or until it '
'succeedes.'))
]
CONF = cfg.CONF
CONF.register_opts(agent_opts, "AGENT")
config.register_agent_state_opts_helper(cfg.CONF)
class HyperVSecurityAgent(sg_rpc.SecurityGroupAgentRpc):
def __init__(self, context, plugin_rpc):
super(HyperVSecurityAgent, self).__init__(context, plugin_rpc)
if sg_rpc.is_firewall_enabled():
self._setup_rpc()
@property
def use_enhanced_rpc(self):
return False
def _setup_rpc(self):
self.topic = topics.AGENT
self.endpoints = [HyperVSecurityCallbackMixin(self)]
consumers = [[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
class HyperVSecurityCallbackMixin(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
target = oslo_messaging.Target(version='1.1')
def __init__(self, sg_agent):
super(HyperVSecurityCallbackMixin, self).__init__()
self.sg_agent = sg_agent
class HyperVNeutronAgent(object):
# Set RPC API version to 1.1 by default.
target = oslo_messaging.Target(version='1.1')
def __init__(self):
super(HyperVNeutronAgent, self).__init__()
self._utils = utilsfactory.get_hypervutils()
self._polling_interval = CONF.AGENT.polling_interval
self._load_physical_network_mappings()
self._network_vswitch_map = {}
self._port_metric_retries = {}
self._set_agent_state()
self._setup_rpc()
def _set_agent_state(self):
self.agent_state = {
'binary': 'neutron-hyperv-agent',
'host': cfg.CONF.host,
'topic': n_const.L2_AGENT_TOPIC,
'configurations': {'vswitch_mappings':
self._physical_network_mappings},
'agent_type': n_const.AGENT_TYPE_HYPERV,
'start_flag': True}
def _report_state(self):
try:
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_LE("Failed reporting state!"))
def _setup_rpc(self):
self.agent_id = 'hyperv_%s' % platform.node()
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
self.sec_groups_agent = HyperVSecurityAgent(
self.context, self.sg_plugin_rpc)
report_interval = CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
def _load_physical_network_mappings(self):
self._physical_network_mappings = {}
for mapping in CONF.AGENT.physical_network_vswitch_mappings:
parts = mapping.split(':')
if len(parts) != 2:
LOG.debug('Invalid physical network mapping: %s', mapping)
else:
pattern = re.escape(parts[0].strip()).replace('\\*', '.*')
vswitch = parts[1].strip()
self._physical_network_mappings[pattern] = vswitch
def _get_vswitch_for_physical_network(self, phys_network_name):
for pattern in self._physical_network_mappings:
if phys_network_name is None:
phys_network_name = ''
if re.match(pattern, phys_network_name):
return self._physical_network_mappings[pattern]
# Not found in the mappings, the vswitch has the same name
return phys_network_name
def _get_network_vswitch_map_by_port_id(self, port_id):
for network_id, map in self._network_vswitch_map.iteritems():
if port_id in map['ports']:
return (network_id, map)
def network_delete(self, context, network_id=None):
LOG.debug("network_delete received. "
"Deleting network %s", network_id)
# The network may not be defined on this agent
if network_id in self._network_vswitch_map:
self._reclaim_local_network(network_id)
else:
LOG.debug("Network %s not defined on agent.", network_id)
def port_delete(self, context, port_id=None):
LOG.debug("port_delete received")
self._port_unbound(port_id)
def port_update(self, context, port=None, network_type=None,
segmentation_id=None, physical_network=None):
LOG.debug("port_update received")
if CONF.SECURITYGROUP.enable_security_group:
if 'security_groups' in port:
self.sec_groups_agent.refresh_firewall()
self._treat_vif_port(
port['id'], port['network_id'],
network_type, physical_network,
segmentation_id, port['admin_state_up'])
def _get_vswitch_name(self, network_type, physical_network):
if network_type != p_const.TYPE_LOCAL:
vswitch_name = self._get_vswitch_for_physical_network(
physical_network)
else:
vswitch_name = CONF.AGENT.local_network_vswitch
return vswitch_name
def _provision_network(self, port_id,
net_uuid, network_type,
physical_network,
segmentation_id):
LOG.info(_LI("Provisioning network %s"), net_uuid)
vswitch_name = self._get_vswitch_name(network_type, physical_network)
if network_type in [p_const.TYPE_VLAN, p_const.TYPE_FLAT]:
#Nothing to do
pass
elif network_type == p_const.TYPE_LOCAL:
#TODO(alexpilotti): Check that the switch type is private
#or create it if not existing
pass
else:
raise utils.HyperVException(
msg=(_("Cannot provision unknown network type %(network_type)s"
" for network %(net_uuid)s") %
dict(network_type=network_type, net_uuid=net_uuid)))
map = {
'network_type': network_type,
'vswitch_name': vswitch_name,
'ports': [],
'vlan_id': segmentation_id}
self._network_vswitch_map[net_uuid] = map
def _reclaim_local_network(self, net_uuid):
LOG.info(_LI("Reclaiming local network %s"), net_uuid)
del self._network_vswitch_map[net_uuid]
def _port_bound(self, port_id,
net_uuid,
network_type,
physical_network,
segmentation_id):
LOG.debug("Binding port %s", port_id)
if net_uuid not in self._network_vswitch_map:
self._provision_network(
port_id, net_uuid, network_type,
physical_network, segmentation_id)
map = self._network_vswitch_map[net_uuid]
map['ports'].append(port_id)
self._utils.connect_vnic_to_vswitch(map['vswitch_name'], port_id)
if network_type == p_const.TYPE_VLAN:
LOG.info(_LI('Binding VLAN ID %(segmentation_id)s '
'to switch port %(port_id)s'),
dict(segmentation_id=segmentation_id, port_id=port_id))
self._utils.set_vswitch_port_vlan_id(
segmentation_id,
port_id)
elif network_type == p_const.TYPE_FLAT:
#Nothing to do
pass
elif network_type == p_const.TYPE_LOCAL:
#Nothing to do
pass
else:
LOG.error(_LE('Unsupported network type %s'), network_type)
if CONF.AGENT.enable_metrics_collection:
self._utils.enable_port_metrics_collection(port_id)
self._port_metric_retries[port_id] = CONF.AGENT.metrics_max_retries
def _port_unbound(self, port_id):
(net_uuid, map) = self._get_network_vswitch_map_by_port_id(port_id)
if net_uuid not in self._network_vswitch_map:
LOG.info(_LI('Network %s is not avalailable on this agent'),
net_uuid)
return
LOG.debug("Unbinding port %s", port_id)
self._utils.disconnect_switch_port(map['vswitch_name'], port_id, True)
if not map['ports']:
self._reclaim_local_network(net_uuid)
def _port_enable_control_metrics(self):
if not CONF.AGENT.enable_metrics_collection:
return
for port_id in self._port_metric_retries.keys():
if self._utils.can_enable_control_metrics(port_id):
self._utils.enable_control_metrics(port_id)
LOG.info(_LI('Port metrics enabled for port: %s'), port_id)
del self._port_metric_retries[port_id]
elif self._port_metric_retries[port_id] < 1:
self._utils.enable_control_metrics(port_id)
LOG.error(_LE('Port metrics raw enabling for port: %s'),
port_id)
del self._port_metric_retries[port_id]
else:
self._port_metric_retries[port_id] -= 1
def _update_ports(self, registered_ports):
ports = self._utils.get_vnic_ids()
if ports == registered_ports:
return
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def _treat_vif_port(self, port_id, network_id, network_type,
physical_network, segmentation_id,
admin_state_up):
if self._utils.vnic_port_exists(port_id):
if admin_state_up:
self._port_bound(port_id, network_id, network_type,
physical_network, segmentation_id)
else:
self._port_unbound(port_id)
else:
LOG.debug("No port %s defined on agent.", port_id)
def _treat_devices_added(self, devices):
try:
devices_details_list = self.plugin_rpc.get_devices_details_list(
self.context,
devices,
self.agent_id)
except Exception as e:
LOG.debug("Unable to get ports details for "
"devices %(devices)s: %(e)s",
{'devices': devices, 'e': e})
# resync is needed
return True
for device_details in devices_details_list:
device = device_details['device']
LOG.info(_LI("Adding port %s"), device)
if 'port_id' in device_details:
LOG.info(_LI("Port %(device)s updated. Details: "
"%(device_details)s"),
{'device': device, 'device_details': device_details})
self._treat_vif_port(
device_details['port_id'],
device_details['network_id'],
device_details['network_type'],
device_details['physical_network'],
device_details['segmentation_id'],
device_details['admin_state_up'])
# check if security groups is enabled.
# if not, teardown the security group rules
if CONF.SECURITYGROUP.enable_security_group:
self.sec_groups_agent.prepare_devices_filter([device])
else:
self._utils.remove_all_security_rules(
device_details['port_id'])
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id,
cfg.CONF.host)
return False
def _treat_devices_removed(self, devices):
resync = False
for device in devices:
LOG.info(_LI("Removing port %s"), device)
try:
self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
cfg.CONF.host)
except Exception as e:
LOG.debug("Removing port failed for device %(device)s: %(e)s",
dict(device=device, e=e))
resync = True
continue
self._port_unbound(device)
return resync
def _process_network_ports(self, port_info):
resync_a = False
resync_b = False
if 'added' in port_info:
resync_a = self._treat_devices_added(port_info['added'])
if 'removed' in port_info:
resync_b = self._treat_devices_removed(port_info['removed'])
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
def daemon_loop(self):
sync = True
ports = set()
while True:
try:
start = time.time()
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
ports.clear()
sync = False
port_info = self._update_ports(ports)
# notify plugin about port deltas
if port_info:
LOG.debug("Agent loop has new devices!")
# If treat devices fails - must resync with plugin
sync = self._process_network_ports(port_info)
ports = port_info['current']
self._port_enable_control_metrics()
except Exception:
LOG.exception(_LE("Error in agent event loop"))
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self._polling_interval):
time.sleep(self._polling_interval - elapsed)
else:
LOG.debug("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)",
{'polling_interval': self._polling_interval,
'elapsed': elapsed})
def main():
common_config.init(sys.argv[1:])
common_config.setup_logging()
plugin = HyperVNeutronAgent()
# Start everything.
LOG.info(_LI("Agent initialized successfully, now running... "))
plugin.daemon_loop()

View File

@ -0,0 +1,125 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import platform
from hyperv.neutron import hyperv_neutron_agent
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron import context
from neutron.i18n import _LE
from neutron.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
# Topic for tunnel notifications between the plugin and agent
TUNNEL = 'tunnel'
class HyperVSecurityAgent(sg_rpc.SecurityGroupAgentRpc):
def __init__(self, context, plugin_rpc):
super(HyperVSecurityAgent, self).__init__(context, plugin_rpc)
if sg_rpc.is_firewall_enabled():
self._setup_rpc()
@property
def use_enhanced_rpc(self):
return False
def _setup_rpc(self):
self.topic = topics.AGENT
self.endpoints = [HyperVSecurityCallbackMixin(self)]
consumers = [[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
class HyperVSecurityCallbackMixin(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
target = oslo_messaging.Target(version='1.1')
def __init__(self, sg_agent):
super(HyperVSecurityCallbackMixin, self).__init__()
self.sg_agent = sg_agent
class HyperVNeutronAgent(hyperv_neutron_agent.HyperVNeutronAgentMixin):
# Set RPC API version to 1.1 by default.
target = oslo_messaging.Target(version='1.1')
def __init__(self):
super(HyperVNeutronAgent, self).__init__(conf=CONF)
self._set_agent_state()
self._setup_rpc()
def _set_agent_state(self):
configurations = self.get_agent_configurations()
self.agent_state = {
'binary': 'neutron-hyperv-agent',
'host': CONF.host,
'topic': n_const.L2_AGENT_TOPIC,
'configurations': configurations,
'agent_type': n_const.AGENT_TYPE_HYPERV,
'start_flag': True}
def _report_state(self):
try:
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_LE("Failed reporting state!"))
def _setup_rpc(self):
self.agent_id = 'hyperv_%s' % platform.node()
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.PORT, topics.DELETE],
[TUNNEL, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
self.client = n_rpc.get_client(self.target)
self.sec_groups_agent = HyperVSecurityAgent(
self.context, self.sg_plugin_rpc)
report_interval = CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)

View File

@ -13,134 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from hyperv.neutron import security_groups_driver as sg_driver
from neutron.agent import firewall
from neutron.i18n import _LE, _LI
from neutron.plugins.hyperv.agent import utilsfactory
from neutron.plugins.hyperv.agent import utilsv2
LOG = logging.getLogger(__name__)
class HyperVSecurityGroupsDriver(firewall.FirewallDriver):
class HyperVSecurityGroupsDriver(sg_driver.HyperVSecurityGroupsDriverMixin,
firewall.FirewallDriver):
"""Security Groups Driver.
Security Groups implementation for Hyper-V VMs.
"""
_ACL_PROP_MAP = {
'direction': {'ingress': utilsv2.HyperVUtilsV2._ACL_DIR_IN,
'egress': utilsv2.HyperVUtilsV2._ACL_DIR_OUT},
'ethertype': {'IPv4': utilsv2.HyperVUtilsV2._ACL_TYPE_IPV4,
'IPv6': utilsv2.HyperVUtilsV2._ACL_TYPE_IPV6},
'protocol': {'icmp': utilsv2.HyperVUtilsV2._ICMP_PROTOCOL},
'default': "ANY",
'address_default': {'IPv4': '0.0.0.0/0', 'IPv6': '::/0'}
}
def __init__(self):
self._utils = utilsfactory.get_hypervutils()
self._security_ports = {}
def prepare_port_filter(self, port):
LOG.debug('Creating port %s rules', len(port['security_group_rules']))
# newly created port, add default rules.
if port['device'] not in self._security_ports:
LOG.debug('Creating default reject rules.')
self._utils.create_default_reject_all_rules(port['id'])
self._security_ports[port['device']] = port
self._create_port_rules(port['id'], port['security_group_rules'])
def _create_port_rules(self, port_id, rules):
for rule in rules:
param_map = self._create_param_map(rule)
try:
self._utils.create_security_rule(port_id, **param_map)
except Exception as ex:
LOG.error(_LE('Hyper-V Exception: %(hyperv_exeption)s while '
'adding rule: %(rule)s'),
dict(hyperv_exeption=ex, rule=rule))
def _remove_port_rules(self, port_id, rules):
for rule in rules:
param_map = self._create_param_map(rule)
try:
self._utils.remove_security_rule(port_id, **param_map)
except Exception as ex:
LOG.error(_LE('Hyper-V Exception: %(hyperv_exeption)s while '
'removing rule: %(rule)s'),
dict(hyperv_exeption=ex, rule=rule))
def _create_param_map(self, rule):
if 'port_range_min' in rule and 'port_range_max' in rule:
local_port = '%s-%s' % (rule['port_range_min'],
rule['port_range_max'])
else:
local_port = self._ACL_PROP_MAP['default']
return {
'direction': self._ACL_PROP_MAP['direction'][rule['direction']],
'acl_type': self._ACL_PROP_MAP['ethertype'][rule['ethertype']],
'local_port': local_port,
'protocol': self._get_rule_protocol(rule),
'remote_address': self._get_rule_remote_address(rule)
}
def apply_port_filter(self, port):
LOG.info(_LI('Aplying port filter.'))
def update_port_filter(self, port):
LOG.info(_LI('Updating port rules.'))
if port['device'] not in self._security_ports:
self.prepare_port_filter(port)
return
old_port = self._security_ports[port['device']]
rules = old_port['security_group_rules']
param_port_rules = port['security_group_rules']
new_rules = [r for r in param_port_rules if r not in rules]
remove_rules = [r for r in rules if r not in param_port_rules]
LOG.info(_LI("Creating %(new)s new rules, removing %(old)s "
"old rules."),
{'new': len(new_rules), 'old': len(remove_rules)})
self._remove_port_rules(old_port['id'], remove_rules)
self._create_port_rules(port['id'], new_rules)
self._security_ports[port['device']] = port
def remove_port_filter(self, port):
LOG.info(_LI('Removing port filter'))
self._security_ports.pop(port['device'], None)
@property
def ports(self):
return self._security_ports
def _get_rule_remote_address(self, rule):
if rule['direction'] is 'ingress':
ip_prefix = 'source_ip_prefix'
else:
ip_prefix = 'dest_ip_prefix'
if ip_prefix in rule:
return rule[ip_prefix]
return self._ACL_PROP_MAP['address_default'][rule['ethertype']]
def _get_rule_protocol(self, rule):
protocol = self._get_rule_prop_or_default(rule, 'protocol')
if protocol in self._ACL_PROP_MAP['protocol'].keys():
return self._ACL_PROP_MAP['protocol'][protocol]
return protocol
def _get_rule_prop_or_default(self, rule, prop):
if prop in rule:
return rule[prop]
return self._ACL_PROP_MAP['default']
pass

View File

@ -1,255 +0,0 @@
# Copyright 2013 Cloudbase Solutions SRL
# Copyright 2013 Pedro Navarro Perez
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import time
from oslo_config import cfg
from oslo_log import log as logging
from neutron.common import exceptions as n_exc
# Check needed for unit testing on Unix
if sys.platform == 'win32':
import wmi
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class HyperVException(n_exc.NeutronException):
message = _('HyperVException: %(msg)s')
WMI_JOB_STATE_STARTED = 4096
WMI_JOB_STATE_RUNNING = 4
WMI_JOB_STATE_COMPLETED = 7
class HyperVUtils(object):
_ETHERNET_SWITCH_PORT = 'Msvm_SwitchPort'
_wmi_namespace = '//./root/virtualization'
def __init__(self):
self._wmi_conn = None
@property
def _conn(self):
if self._wmi_conn is None:
self._wmi_conn = wmi.WMI(moniker=self._wmi_namespace)
return self._wmi_conn
def get_switch_ports(self, vswitch_name):
vswitch = self._get_vswitch(vswitch_name)
vswitch_ports = vswitch.associators(
wmi_result_class=self._ETHERNET_SWITCH_PORT)
return set(p.Name for p in vswitch_ports)
def vnic_port_exists(self, port_id):
try:
self._get_vnic_settings(port_id)
except Exception:
return False
return True
def get_vnic_ids(self):
return set(
p.ElementName
for p in self._conn.Msvm_SyntheticEthernetPortSettingData()
if p.ElementName is not None)
def _get_vnic_settings(self, vnic_name):
vnic_settings = self._conn.Msvm_SyntheticEthernetPortSettingData(
ElementName=vnic_name)
if not vnic_settings:
raise HyperVException(msg=_('Vnic not found: %s') % vnic_name)
return vnic_settings[0]
def connect_vnic_to_vswitch(self, vswitch_name, switch_port_name):
vnic_settings = self._get_vnic_settings(switch_port_name)
if not vnic_settings.Connection or not vnic_settings.Connection[0]:
port = self.get_port_by_id(switch_port_name, vswitch_name)
if port:
port_path = port.Path_()
else:
port_path = self._create_switch_port(
vswitch_name, switch_port_name)
vnic_settings.Connection = [port_path]
self._modify_virt_resource(vnic_settings)
def _get_vm_from_res_setting_data(self, res_setting_data):
sd = res_setting_data.associators(
wmi_result_class='Msvm_VirtualSystemSettingData')
vm = sd[0].associators(
wmi_result_class='Msvm_ComputerSystem')
return vm[0]
def _modify_virt_resource(self, res_setting_data):
vm = self._get_vm_from_res_setting_data(res_setting_data)
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
(job_path, ret_val) = vs_man_svc.ModifyVirtualSystemResources(
vm.Path_(), [res_setting_data.GetText_(1)])
self._check_job_status(ret_val, job_path)
def _check_job_status(self, ret_val, jobpath):
"""Poll WMI job state for completion."""
if not ret_val:
return
elif ret_val not in [WMI_JOB_STATE_STARTED, WMI_JOB_STATE_RUNNING]:
raise HyperVException(msg=_('Job failed with error %d') % ret_val)
job_wmi_path = jobpath.replace('\\', '/')
job = wmi.WMI(moniker=job_wmi_path)
while job.JobState == WMI_JOB_STATE_RUNNING:
time.sleep(0.1)
job = wmi.WMI(moniker=job_wmi_path)
if job.JobState != WMI_JOB_STATE_COMPLETED:
job_state = job.JobState
if job.path().Class == "Msvm_ConcreteJob":
err_sum_desc = job.ErrorSummaryDescription
err_desc = job.ErrorDescription
err_code = job.ErrorCode
data = {'job_state': job_state,
'err_sum_desc': err_sum_desc,
'err_desc': err_desc,
'err_code': err_code}
raise HyperVException(
msg=_("WMI job failed with status %(job_state)d. "
"Error details: %(err_sum_desc)s - %(err_desc)s - "
"Error code: %(err_code)d") % data)
else:
(error, ret_val) = job.GetError()
if not ret_val and error:
data = {'job_state': job_state,
'error': error}
raise HyperVException(
msg=_("WMI job failed with status %(job_state)d. "
"Error details: %(error)s") % data)
else:
raise HyperVException(
msg=_("WMI job failed with status %d. "
"No error description available") % job_state)
desc = job.Description
elap = job.ElapsedTime
LOG.debug("WMI job succeeded: %(desc)s, Elapsed=%(elap)s",
{'desc': desc, 'elap': elap})
def _create_switch_port(self, vswitch_name, switch_port_name):
"""Creates a switch port."""
switch_svc = self._conn.Msvm_VirtualSwitchManagementService()[0]
vswitch_path = self._get_vswitch(vswitch_name).path_()
(new_port, ret_val) = switch_svc.CreateSwitchPort(
Name=switch_port_name,
FriendlyName=switch_port_name,
ScopeOfResidence="",
VirtualSwitch=vswitch_path)
if ret_val != 0:
raise HyperVException(
msg=_('Failed creating port for %s') % vswitch_name)
return new_port
def remove_all_security_rules(self, switch_port_name):
pass
def disconnect_switch_port(
self, vswitch_name, switch_port_name, delete_port):
"""Disconnects the switch port."""
switch_svc = self._conn.Msvm_VirtualSwitchManagementService()[0]
switch_port_path = self._get_switch_port_path_by_name(
switch_port_name)
if not switch_port_path:
# Port not found. It happens when the VM was already deleted.
return
(ret_val, ) = switch_svc.DisconnectSwitchPort(
SwitchPort=switch_port_path)
if ret_val != 0:
data = {'switch_port_name': switch_port_name,
'vswitch_name': vswitch_name,
'ret_val': ret_val}
raise HyperVException(
msg=_('Failed to disconnect port %(switch_port_name)s '
'from switch %(vswitch_name)s '
'with error %(ret_val)s') % data)
if delete_port:
(ret_val, ) = switch_svc.DeleteSwitchPort(
SwitchPort=switch_port_path)
if ret_val != 0:
data = {'switch_port_name': switch_port_name,
'vswitch_name': vswitch_name,
'ret_val': ret_val}
raise HyperVException(
msg=_('Failed to delete port %(switch_port_name)s '
'from switch %(vswitch_name)s '
'with error %(ret_val)s') % data)
def _get_vswitch(self, vswitch_name):
vswitch = self._conn.Msvm_VirtualSwitch(ElementName=vswitch_name)
if not vswitch:
raise HyperVException(msg=_('VSwitch not found: %s') %
vswitch_name)
return vswitch[0]
def _get_vswitch_external_port(self, vswitch):
vswitch_ports = vswitch.associators(
wmi_result_class=self._ETHERNET_SWITCH_PORT)
for vswitch_port in vswitch_ports:
lan_endpoints = vswitch_port.associators(
wmi_result_class='Msvm_SwitchLanEndpoint')
if lan_endpoints:
ext_port = lan_endpoints[0].associators(
wmi_result_class='Msvm_ExternalEthernetPort')
if ext_port:
return vswitch_port
def set_vswitch_port_vlan_id(self, vlan_id, switch_port_name):
vlan_endpoint_settings = self._conn.Msvm_VLANEndpointSettingData(
ElementName=switch_port_name)[0]
if vlan_endpoint_settings.AccessVLAN != vlan_id:
vlan_endpoint_settings.AccessVLAN = vlan_id
vlan_endpoint_settings.put()
def _get_switch_port_path_by_name(self, switch_port_name):
vswitch = self._conn.Msvm_SwitchPort(ElementName=switch_port_name)
if vswitch:
return vswitch[0].path_()
def get_vswitch_id(self, vswitch_name):
vswitch = self._get_vswitch(vswitch_name)
return vswitch.Name
def get_port_by_id(self, port_id, vswitch_name):
vswitch = self._get_vswitch(vswitch_name)
switch_ports = vswitch.associators(
wmi_result_class=self._ETHERNET_SWITCH_PORT)
for switch_port in switch_ports:
if (switch_port.ElementName == port_id):
return switch_port
def enable_port_metrics_collection(self, switch_port_name):
raise NotImplementedError(_("Metrics collection is not supported on "
"this version of Hyper-V"))
def enable_control_metrics(self, switch_port_name):
raise NotImplementedError(_("Metrics collection is not supported on "
"this version of Hyper-V"))
def can_enable_control_metrics(self, switch_port_name):
return False

View File

@ -1,71 +0,0 @@
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_config import cfg
from oslo_log import log as logging
from neutron.i18n import _LW
from neutron.plugins.hyperv.agent import utils
from neutron.plugins.hyperv.agent import utilsv2
# Check needed for unit testing on Unix
if sys.platform == 'win32':
import wmi
hyper_opts = [
cfg.BoolOpt('force_hyperv_utils_v1',
default=False,
help=_('Force V1 WMI utility classes')),
]
CONF = cfg.CONF
CONF.register_opts(hyper_opts, 'hyperv')
LOG = logging.getLogger(__name__)
def _get_windows_version():
return wmi.WMI(moniker='//./root/cimv2').Win32_OperatingSystem()[0].Version
def _check_min_windows_version(major, minor, build=0):
version_str = _get_windows_version()
return map(int, version_str.split('.')) >= [major, minor, build]
def get_hypervutils():
# V1 virtualization namespace features are supported up to
# Windows Server / Hyper-V Server 2012
# V2 virtualization namespace features are supported starting with
# Windows Server / Hyper-V Server 2012
# Windows Server / Hyper-V Server 2012 R2 uses the V2 namespace and
# introduces additional features
force_v1_flag = CONF.hyperv.force_hyperv_utils_v1
if _check_min_windows_version(6, 3):
if force_v1_flag:
LOG.warning(_LW('V1 virtualization namespace no longer supported '
'on Windows Server / Hyper-V Server 2012 R2 or '
'above.'))
cls = utilsv2.HyperVUtilsV2R2
elif not force_v1_flag and _check_min_windows_version(6, 2):
cls = utilsv2.HyperVUtilsV2
else:
cls = utils.HyperVUtils
LOG.debug("Loading class: %(module_name)s.%(class_name)s",
{'module_name': cls.__module__, 'class_name': cls.__name__})
return cls()

View File

@ -1,435 +0,0 @@
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.plugins.hyperv.agent import utils
class HyperVUtilsV2(utils.HyperVUtils):
_EXTERNAL_PORT = 'Msvm_ExternalEthernetPort'
_ETHERNET_SWITCH_PORT = 'Msvm_EthernetSwitchPort'
_PORT_ALLOC_SET_DATA = 'Msvm_EthernetPortAllocationSettingData'
_PORT_VLAN_SET_DATA = 'Msvm_EthernetSwitchPortVlanSettingData'
_PORT_SECURITY_SET_DATA = 'Msvm_EthernetSwitchPortSecuritySettingData'
_PORT_ALLOC_ACL_SET_DATA = 'Msvm_EthernetSwitchPortAclSettingData'
_PORT_EXT_ACL_SET_DATA = _PORT_ALLOC_ACL_SET_DATA
_LAN_ENDPOINT = 'Msvm_LANEndpoint'
_STATE_DISABLED = 3
_OPERATION_MODE_ACCESS = 1
_VIRTUAL_SYSTEM_SETTING_DATA = 'Msvm_VirtualSystemSettingData'
_VM_SUMMARY_ENABLED_STATE = 100
_HYPERV_VM_STATE_ENABLED = 2
_ACL_DIR_IN = 1
_ACL_DIR_OUT = 2
_ACL_TYPE_IPV4 = 2
_ACL_TYPE_IPV6 = 3
_ACL_ACTION_ALLOW = 1
_ACL_ACTION_DENY = 2
_ACL_ACTION_METER = 3
_METRIC_ENABLED = 2
_NET_IN_METRIC_NAME = 'Filtered Incoming Network Traffic'
_NET_OUT_METRIC_NAME = 'Filtered Outgoing Network Traffic'
_ACL_APPLICABILITY_LOCAL = 1
_ACL_APPLICABILITY_REMOTE = 2
_ACL_DEFAULT = 'ANY'
_IPV4_ANY = '0.0.0.0/0'
_IPV6_ANY = '::/0'
_TCP_PROTOCOL = 'tcp'
_UDP_PROTOCOL = 'udp'
_ICMP_PROTOCOL = '1'
_MAX_WEIGHT = 65500
# 2 directions x 2 address types = 4 ACLs
_REJECT_ACLS_COUNT = 4
_wmi_namespace = '//./root/virtualization/v2'
def __init__(self):
super(HyperVUtilsV2, self).__init__()
def connect_vnic_to_vswitch(self, vswitch_name, switch_port_name):
vnic = self._get_vnic_settings(switch_port_name)
vswitch = self._get_vswitch(vswitch_name)
port, found = self._get_switch_port_allocation(switch_port_name, True)
port.HostResource = [vswitch.path_()]
port.Parent = vnic.path_()
if not found:
vm = self._get_vm_from_res_setting_data(vnic)
self._add_virt_resource(vm, port)
else:
self._modify_virt_resource(port)
def _modify_virt_resource(self, res_setting_data):
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
(job_path, out_set_data, ret_val) = vs_man_svc.ModifyResourceSettings(
ResourceSettings=[res_setting_data.GetText_(1)])
self._check_job_status(ret_val, job_path)
def _add_virt_resource(self, vm, res_setting_data):
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
(job_path, out_set_data, ret_val) = vs_man_svc.AddResourceSettings(
vm.path_(), [res_setting_data.GetText_(1)])
self._check_job_status(ret_val, job_path)
def _remove_virt_resource(self, res_setting_data):
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
(job, ret_val) = vs_man_svc.RemoveResourceSettings(
ResourceSettings=[res_setting_data.path_()])
self._check_job_status(ret_val, job)
def _add_virt_feature(self, element, res_setting_data):
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
(job_path, out_set_data, ret_val) = vs_man_svc.AddFeatureSettings(
element.path_(), [res_setting_data.GetText_(1)])
self._check_job_status(ret_val, job_path)
def _remove_virt_feature(self, feature_resource):
self._remove_multiple_virt_features([feature_resource])
def _remove_multiple_virt_features(self, feature_resources):
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
(job_path, ret_val) = vs_man_svc.RemoveFeatureSettings(
FeatureSettings=[f.path_() for f in feature_resources])
self._check_job_status(ret_val, job_path)
def disconnect_switch_port(
self, vswitch_name, switch_port_name, delete_port):
"""Disconnects the switch port."""
sw_port, found = self._get_switch_port_allocation(switch_port_name)
if not sw_port:
# Port not found. It happens when the VM was already deleted.
return
if delete_port:
self._remove_virt_resource(sw_port)
else:
sw_port.EnabledState = self._STATE_DISABLED
self._modify_virt_resource(sw_port)
def _get_vswitch(self, vswitch_name):
vswitch = self._conn.Msvm_VirtualEthernetSwitch(
ElementName=vswitch_name)
if not len(vswitch):
raise utils.HyperVException(msg=_('VSwitch not found: %s') %
vswitch_name)
return vswitch[0]
def _get_vswitch_external_port(self, vswitch):
vswitch_ports = vswitch.associators(
wmi_result_class=self._ETHERNET_SWITCH_PORT)
for vswitch_port in vswitch_ports:
lan_endpoints = vswitch_port.associators(
wmi_result_class=self._LAN_ENDPOINT)
if len(lan_endpoints):
lan_endpoints = lan_endpoints[0].associators(
wmi_result_class=self._LAN_ENDPOINT)
if len(lan_endpoints):
ext_port = lan_endpoints[0].associators(
wmi_result_class=self._EXTERNAL_PORT)
if ext_port:
return vswitch_port
def set_vswitch_port_vlan_id(self, vlan_id, switch_port_name):
port_alloc, found = self._get_switch_port_allocation(switch_port_name)
if not found:
raise utils.HyperVException(
msg=_('Port Allocation not found: %s') % switch_port_name)
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
vlan_settings = self._get_vlan_setting_data_from_port_alloc(port_alloc)
if vlan_settings:
# Removing the feature because it cannot be modified
# due to a wmi exception.
(job_path, ret_val) = vs_man_svc.RemoveFeatureSettings(
FeatureSettings=[vlan_settings.path_()])
self._check_job_status(ret_val, job_path)
(vlan_settings, found) = self._get_vlan_setting_data(switch_port_name)
vlan_settings.AccessVlanId = vlan_id
vlan_settings.OperationMode = self._OPERATION_MODE_ACCESS
(job_path, out, ret_val) = vs_man_svc.AddFeatureSettings(
port_alloc.path_(), [vlan_settings.GetText_(1)])
self._check_job_status(ret_val, job_path)
def _get_vlan_setting_data_from_port_alloc(self, port_alloc):
return self._get_first_item(port_alloc.associators(
wmi_result_class=self._PORT_VLAN_SET_DATA))
def _get_vlan_setting_data(self, switch_port_name, create=True):
return self._get_setting_data(
self._PORT_VLAN_SET_DATA,
switch_port_name, create)
def _get_switch_port_allocation(self, switch_port_name, create=False):
return self._get_setting_data(
self._PORT_ALLOC_SET_DATA,
switch_port_name, create)
def _get_setting_data(self, class_name, element_name, create=True):
element_name = element_name.replace("'", '"')
q = self._conn.query("SELECT * FROM %(class_name)s WHERE "
"ElementName = '%(element_name)s'" %
{"class_name": class_name,
"element_name": element_name})
data = self._get_first_item(q)
found = data is not None
if not data and create:
data = self._get_default_setting_data(class_name)
data.ElementName = element_name
return data, found
def _get_default_setting_data(self, class_name):
return self._conn.query("SELECT * FROM %s WHERE InstanceID "
"LIKE '%%\\Default'" % class_name)[0]
def _get_first_item(self, obj):
if obj:
return obj[0]
def enable_port_metrics_collection(self, switch_port_name):
port, found = self._get_switch_port_allocation(switch_port_name, False)
if not found:
return
# Add the ACLs only if they don't already exist
acls = port.associators(wmi_result_class=self._PORT_ALLOC_ACL_SET_DATA)
for acl_type in [self._ACL_TYPE_IPV4, self._ACL_TYPE_IPV6]:
for acl_dir in [self._ACL_DIR_IN, self._ACL_DIR_OUT]:
_acls = self._filter_acls(
acls, self._ACL_ACTION_METER, acl_dir, acl_type)
if not _acls:
acl = self._create_acl(
acl_dir, acl_type, self._ACL_ACTION_METER)
self._add_virt_feature(port, acl)
def enable_control_metrics(self, switch_port_name):
port, found = self._get_switch_port_allocation(switch_port_name, False)
if not found:
return
metric_svc = self._conn.Msvm_MetricService()[0]
metric_names = [self._NET_IN_METRIC_NAME, self._NET_OUT_METRIC_NAME]
for metric_name in metric_names:
metric_def = self._conn.CIM_BaseMetricDefinition(Name=metric_name)
if metric_def:
metric_svc.ControlMetrics(
Subject=port.path_(),
Definition=metric_def[0].path_(),
MetricCollectionEnabled=self._METRIC_ENABLED)
def can_enable_control_metrics(self, switch_port_name):
port, found = self._get_switch_port_allocation(switch_port_name, False)
if not found:
return False
if not self._is_port_vm_started(port):
return False
# all 4 meter ACLs must be existent first. (2 x direction)
acls = port.associators(wmi_result_class=self._PORT_ALLOC_ACL_SET_DATA)
acls = [a for a in acls if a.Action == self._ACL_ACTION_METER]
if len(acls) < 2:
return False
return True
def _is_port_vm_started(self, port):
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
vmsettings = port.associators(
wmi_result_class=self._VIRTUAL_SYSTEM_SETTING_DATA)
#See http://msdn.microsoft.com/en-us/library/cc160706%28VS.85%29.aspx
(ret_val, summary_info) = vs_man_svc.GetSummaryInformation(
[self._VM_SUMMARY_ENABLED_STATE],
[v.path_() for v in vmsettings])
if ret_val or not summary_info:
raise utils.HyperVException(msg=_('Cannot get VM summary data '
'for: %s') % port.ElementName)
return summary_info[0].EnabledState is self._HYPERV_VM_STATE_ENABLED
def create_security_rule(self, switch_port_name, direction, acl_type,
local_port, protocol, remote_address):
port, found = self._get_switch_port_allocation(switch_port_name, False)
if not found:
return
# Add the ACLs only if they don't already exist
acls = port.associators(wmi_result_class=self._PORT_EXT_ACL_SET_DATA)
weight = self._get_new_weight(acls)
self._bind_security_rule(
port, direction, acl_type, self._ACL_ACTION_ALLOW, local_port,
protocol, remote_address, weight)
def remove_security_rule(self, switch_port_name, direction, acl_type,
local_port, protocol, remote_address):
port, found = self._get_switch_port_allocation(switch_port_name, False)
if not found:
# Port not found. It happens when the VM was already deleted.
return
acls = port.associators(wmi_result_class=self._PORT_EXT_ACL_SET_DATA)
filtered_acls = self._filter_security_acls(
acls, self._ACL_ACTION_ALLOW, direction, acl_type, local_port,
protocol, remote_address)
for acl in filtered_acls:
self._remove_virt_feature(acl)
def remove_all_security_rules(self, switch_port_name):
port, found = self._get_switch_port_allocation(switch_port_name, False)
if not found:
# Port not found. It happens when the VM was already deleted.
return
acls = port.associators(wmi_result_class=self._PORT_EXT_ACL_SET_DATA)
filtered_acls = [a for a in acls if
a.Action is not self._ACL_ACTION_METER]
if filtered_acls:
self._remove_multiple_virt_features(filtered_acls)
def create_default_reject_all_rules(self, switch_port_name):
port, found = self._get_switch_port_allocation(switch_port_name, False)
if not found:
raise utils.HyperVException(
msg=_('Port Allocation not found: %s') % switch_port_name)
acls = port.associators(wmi_result_class=self._PORT_EXT_ACL_SET_DATA)
filtered_acls = [v for v in acls if v.Action == self._ACL_ACTION_DENY]
if len(filtered_acls) >= self._REJECT_ACLS_COUNT:
return
for acl in filtered_acls:
self._remove_virt_feature(acl)
weight = 0
ipv4_pair = (self._ACL_TYPE_IPV4, self._IPV4_ANY)
ipv6_pair = (self._ACL_TYPE_IPV6, self._IPV6_ANY)
for direction in [self._ACL_DIR_IN, self._ACL_DIR_OUT]:
for acl_type, address in [ipv4_pair, ipv6_pair]:
for protocol in [self._TCP_PROTOCOL,
self._UDP_PROTOCOL,
self._ICMP_PROTOCOL]:
self._bind_security_rule(
port, direction, acl_type, self._ACL_ACTION_DENY,
self._ACL_DEFAULT, protocol, address, weight)
weight += 1
def _bind_security_rule(self, port, direction, acl_type, action,
local_port, protocol, remote_address, weight):
acls = port.associators(wmi_result_class=self._PORT_EXT_ACL_SET_DATA)
filtered_acls = self._filter_security_acls(
acls, action, direction, acl_type, local_port, protocol,
remote_address)
for acl in filtered_acls:
self._remove_virt_feature(acl)
acl = self._create_security_acl(
direction, acl_type, action, local_port, protocol, remote_address,
weight)
self._add_virt_feature(port, acl)
def _create_acl(self, direction, acl_type, action):
acl = self._get_default_setting_data(self._PORT_ALLOC_ACL_SET_DATA)
acl.set(Direction=direction,
AclType=acl_type,
Action=action,
Applicability=self._ACL_APPLICABILITY_LOCAL)
return acl
def _create_security_acl(self, direction, acl_type, action, local_port,
protocol, remote_ip_address, weight):
acl = self._create_acl(direction, acl_type, action)
(remote_address, remote_prefix_length) = remote_ip_address.split('/')
acl.set(Applicability=self._ACL_APPLICABILITY_REMOTE,
RemoteAddress=remote_address,
RemoteAddressPrefixLength=remote_prefix_length)
return acl
def _filter_acls(self, acls, action, direction, acl_type, remote_addr=""):
return [v for v in acls
if v.Action == action and
v.Direction == direction and
v.AclType == acl_type and
v.RemoteAddress == remote_addr]
def _filter_security_acls(self, acls, acl_action, direction, acl_type,
local_port, protocol, remote_addr=""):
(remote_address, remote_prefix_length) = remote_addr.split('/')
remote_prefix_length = int(remote_prefix_length)
return [v for v in acls
if v.Direction == direction and
v.Action in [self._ACL_ACTION_ALLOW, self._ACL_ACTION_DENY] and
v.AclType == acl_type and
v.RemoteAddress == remote_address and
v.RemoteAddressPrefixLength == remote_prefix_length]
def _get_new_weight(self, acls):
return 0
class HyperVUtilsV2R2(HyperVUtilsV2):
_PORT_EXT_ACL_SET_DATA = 'Msvm_EthernetSwitchPortExtendedAclSettingData'
_MAX_WEIGHT = 65500
# 2 directions x 2 address types x 3 protocols = 12 ACLs
_REJECT_ACLS_COUNT = 12
def _create_security_acl(self, direction, acl_type, action, local_port,
protocol, remote_addr, weight):
acl = self._get_default_setting_data(self._PORT_EXT_ACL_SET_DATA)
acl.set(Direction=direction,
Action=action,
LocalPort=str(local_port),
Protocol=protocol,
RemoteIPAddress=remote_addr,
IdleSessionTimeout=0,
Weight=weight)
return acl
def _filter_security_acls(self, acls, action, direction, acl_type,
local_port, protocol, remote_addr=""):
return [v for v in acls
if v.Action == action and
v.Direction == direction and
v.LocalPort == str(local_port) and
v.Protocol == protocol and
v.RemoteIPAddress == remote_addr]
def _get_new_weight(self, acls):
acls = [a for a in acls if a.Action is not self._ACL_ACTION_DENY]
if not acls:
return self._MAX_WEIGHT - 1
weights = [a.Weight for a in acls]
min_weight = min(weights)
for weight in range(min_weight, self._MAX_WEIGHT):
if weight not in weights:
return weight
return min_weight - 1

View File

@ -1,20 +0,0 @@
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Topic for tunnel notifications between the plugin and agent
TUNNEL = 'tunnel'
# Special vlan_id value in ovs_vlan_allocations table indicating flat network
FLAT_VLAN_ID = -1

View File

@ -0,0 +1,22 @@
Hyper-V Neutron Agent and ML2 Mechanism Driver for ML2 Plugin
=============================================================
In order to properly use the Hyper-V Neutron Agent, neutron will have to use
the Ml2Plugin. This can be done by setting the ``core_plugin`` field in
``neutron.conf`` to:
::
core_plugin = neutron.plugins.ml2.plugin.Ml2Plugin
Additionally, the ML2 Plugin must be configured to use the Hyper-V Mechanism
Driver, by adding it to the ``mechanism_drivers`` field in ``ml2_conf.ini``:
::
[ml2]
mechanism_drivers = openvswitch,hyperv
# any other mechanism_drivers can be added to the list.
Currently, the mechanism driver supports the following network types: local,
flat, VLAN.

View File

@ -13,19 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from oslo_log import log
from hyperv.neutron.ml2 import mech_hyperv
from neutron.common import constants
from neutron.extensions import portbindings
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2.drivers import mech_agent
LOG = log.getLogger(__name__)
class HypervMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
class HypervMechanismDriver(mech_hyperv.HypervMechanismDriver,
mech_agent.SimpleAgentMechanismDriverBase):
"""Attach to networks using hyperv L2 agent.
The HypervMechanismDriver integrates the ml2 plugin with the
@ -39,13 +35,3 @@ class HypervMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
constants.AGENT_TYPE_HYPERV,
portbindings.VIF_TYPE_HYPERV,
{portbindings.CAP_PORT_FILTER: False})
def get_allowed_network_types(self, agent=None):
return [p_constants.TYPE_LOCAL, p_constants.TYPE_FLAT,
p_constants.TYPE_VLAN]
def get_mappings(self, agent):
return agent['configurations'].get('vswitch_mappings', {})
def physnet_in_mappings(self, physnet, mappings):
return any(re.match(pattern, physnet) for pattern in mappings)

View File

@ -0,0 +1,6 @@
# The order of packages is significant, because pip processes them in the
# order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
networking-hyperv>=1.0.0

View File

@ -1,222 +0,0 @@
# Copyright 2013 Cloudbase Solutions SRL
# Copyright 2013 Pedro Navarro Perez
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for Windows Hyper-V virtual switch neutron driver
"""
import mock
from oslo_config import cfg
from neutron.plugins.hyperv.agent import hyperv_neutron_agent
from neutron.plugins.hyperv.agent import utilsfactory
from neutron.tests import base
cfg.CONF.import_opt('enable_metrics_collection',
'neutron.plugins.hyperv.agent.hyperv_neutron_agent',
'AGENT')
class TestHyperVNeutronAgent(base.BaseTestCase):
_FAKE_PORT_ID = 'fake_port_id'
def setUp(self):
super(TestHyperVNeutronAgent, self).setUp()
utilsfactory._get_windows_version = mock.MagicMock(
return_value='6.2.0')
class MockFixedIntervalLoopingCall(object):
def __init__(self, f):
self.f = f
def start(self, interval=0):
self.f()
mock.patch('neutron.openstack.common.loopingcall.'
'FixedIntervalLoopingCall',
new=MockFixedIntervalLoopingCall).start()
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
self.agent = hyperv_neutron_agent.HyperVNeutronAgent()
self.agent.plugin_rpc = mock.Mock()
self.agent.sec_groups_agent = mock.MagicMock()
self.agent.sg_plugin_rpc = mock.Mock()
self.agent.context = mock.Mock()
self.agent.agent_id = mock.Mock()
fake_agent_state = {
'binary': 'neutron-hyperv-agent',
'host': 'fake_host_name',
'topic': 'N/A',
'configurations': {'vswitch_mappings': ['*:MyVirtualSwitch']},
'agent_type': 'HyperV agent',
'start_flag': True}
self.agent_state = fake_agent_state
def test_use_enhanced_rpc(self):
self.agent.sec_groups_agent = hyperv_neutron_agent.HyperVSecurityAgent(
self.agent.context, self.agent.sg_plugin_rpc)
self.assertFalse(self.agent.sec_groups_agent.use_enhanced_rpc)
def test_port_bound_enable_metrics(self):
cfg.CONF.set_override('enable_metrics_collection', True, 'AGENT')
self._test_port_bound(True)
def test_port_bound_no_metrics(self):
cfg.CONF.set_override('enable_metrics_collection', False, 'AGENT')
self._test_port_bound(False)
def _test_port_bound(self, enable_metrics):
port = mock.MagicMock()
mock_enable_metrics = mock.MagicMock()
net_uuid = 'my-net-uuid'
with mock.patch.multiple(
self.agent._utils,
connect_vnic_to_vswitch=mock.MagicMock(),
set_vswitch_port_vlan_id=mock.MagicMock(),
enable_port_metrics_collection=mock_enable_metrics):
self.agent._port_bound(port, net_uuid, 'vlan', None, None)
self.assertEqual(enable_metrics, mock_enable_metrics.called)
def test_port_unbound(self):
map = {
'network_type': 'vlan',
'vswitch_name': 'fake-vswitch',
'ports': [],
'vlan_id': 1}
net_uuid = 'my-net-uuid'
network_vswitch_map = (net_uuid, map)
with mock.patch.object(self.agent,
'_get_network_vswitch_map_by_port_id',
return_value=network_vswitch_map):
with mock.patch.object(
self.agent._utils,
'disconnect_switch_port'):
self.agent._port_unbound(net_uuid)
def test_port_enable_control_metrics_ok(self):
cfg.CONF.set_override('enable_metrics_collection', True, 'AGENT')
self.agent._port_metric_retries[self._FAKE_PORT_ID] = (
cfg.CONF.AGENT.metrics_max_retries)
with mock.patch.multiple(self.agent._utils,
can_enable_control_metrics=mock.MagicMock(),
enable_control_metrics=mock.MagicMock()):
self.agent._utils.can_enable_control_metrics.return_value = True
self.agent._port_enable_control_metrics()
self.agent._utils.enable_control_metrics.assert_called_with(
self._FAKE_PORT_ID)
self.assertNotIn(self._FAKE_PORT_ID, self.agent._port_metric_retries)
def test_port_enable_control_metrics_maxed(self):
cfg.CONF.set_override('enable_metrics_collection', True, 'AGENT')
cfg.CONF.set_override('metrics_max_retries', 3, 'AGENT')
self.agent._port_metric_retries[self._FAKE_PORT_ID] = (
cfg.CONF.AGENT.metrics_max_retries)
with mock.patch.multiple(self.agent._utils,
can_enable_control_metrics=mock.MagicMock(),
enable_control_metrics=mock.MagicMock()):
self.agent._utils.can_enable_control_metrics.return_value = False
for i in range(cfg.CONF.AGENT.metrics_max_retries + 1):
self.assertIn(self._FAKE_PORT_ID,
self.agent._port_metric_retries)
self.agent._port_enable_control_metrics()
self.assertNotIn(self._FAKE_PORT_ID, self.agent._port_metric_retries)
def test_treat_devices_added_returns_true_for_missing_device(self):
attrs = {'get_devices_details_list.side_effect': Exception()}
self.agent.plugin_rpc.configure_mock(**attrs)
self.assertTrue(self.agent._treat_devices_added([{}]))
def mock_treat_devices_added(self, details, func_name):
"""Mock treat devices added.
:param details: the details to return for the device
:param func_name: the function that should be called
:returns: whether the named function was called
"""
attrs = {'get_devices_details_list.return_value': [details]}
self.agent.plugin_rpc.configure_mock(**attrs)
with mock.patch.object(self.agent, func_name) as func:
self.assertFalse(self.agent._treat_devices_added([{}]))
return func.called
def test_treat_devices_added_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
with mock.patch.object(self.agent.plugin_rpc,
"update_device_up") as func:
self.assertTrue(self.mock_treat_devices_added(details,
'_treat_vif_port'))
self.assertTrue(func.called)
def test_treat_devices_added_missing_port_id(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: False
with mock.patch.object(self.agent.plugin_rpc,
"update_device_up") as func:
self.assertFalse(self.mock_treat_devices_added(details,
'_treat_vif_port'))
self.assertFalse(func.called)
def test_treat_devices_removed_returns_true_for_missing_device(self):
attrs = {'update_device_down.side_effect': Exception()}
self.agent.plugin_rpc.configure_mock(**attrs)
self.assertTrue(self.agent._treat_devices_removed([{}]))
def mock_treat_devices_removed(self, port_exists):
details = dict(exists=port_exists)
attrs = {'update_device_down.return_value': details}
self.agent.plugin_rpc.configure_mock(**attrs)
with mock.patch.object(self.agent, '_port_unbound') as func:
self.assertFalse(self.agent._treat_devices_removed([{}]))
self.assertEqual(func.called, not port_exists)
def test_treat_devices_removed_unbinds_port(self):
self.mock_treat_devices_removed(False)
def test_treat_devices_removed_ignores_missing_port(self):
self.mock_treat_devices_removed(False)
def test_report_state(self):
with mock.patch.object(self.agent.state_rpc,
"report_state") as report_st:
self.agent._report_state()
report_st.assert_called_with(self.agent.context,
self.agent.agent_state)
self.assertNotIn("start_flag", self.agent.agent_state)
def test_main(self):
with mock.patch.object(hyperv_neutron_agent,
'HyperVNeutronAgent') as plugin:
with mock.patch.object(hyperv_neutron_agent,
'common_config') as common_config:
hyperv_neutron_agent.main()
self.assertTrue(common_config.init.called)
self.assertTrue(common_config.setup_logging.called)
plugin.assert_has_calls([mock.call().daemon_loop()])

View File

@ -1,188 +0,0 @@
# Copyright 2014 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for the Hyper-V Security Groups Driver.
"""
import mock
from oslo_config import cfg
from neutron.plugins.hyperv.agent import security_groups_driver as sg_driver
from neutron.plugins.hyperv.agent import utilsfactory
from neutron.tests import base
CONF = cfg.CONF
class TestHyperVSecurityGroupsDriver(base.BaseTestCase):
_FAKE_DEVICE = 'fake_device'
_FAKE_ID = 'fake_id'
_FAKE_DIRECTION = 'ingress'
_FAKE_ETHERTYPE = 'IPv4'
_FAKE_ETHERTYPE_IPV6 = 'IPv6'
_FAKE_DEST_IP_PREFIX = 'fake_dest_ip_prefix'
_FAKE_SOURCE_IP_PREFIX = 'fake_source_ip_prefix'
_FAKE_PARAM_NAME = 'fake_param_name'
_FAKE_PARAM_VALUE = 'fake_param_value'
_FAKE_PORT_MIN = 9001
_FAKE_PORT_MAX = 9011
def setUp(self):
super(TestHyperVSecurityGroupsDriver, self).setUp()
self._mock_windows_version = mock.patch.object(utilsfactory,
'get_hypervutils')
self._mock_windows_version.start()
self._driver = sg_driver.HyperVSecurityGroupsDriver()
self._driver._utils = mock.MagicMock()
@mock.patch('neutron.plugins.hyperv.agent.security_groups_driver'
'.HyperVSecurityGroupsDriver._create_port_rules')
def test_prepare_port_filter(self, mock_create_rules):
mock_port = self._get_port()
mock_utils_method = self._driver._utils.create_default_reject_all_rules
self._driver.prepare_port_filter(mock_port)
self.assertEqual(mock_port,
self._driver._security_ports[self._FAKE_DEVICE])
mock_utils_method.assert_called_once_with(self._FAKE_ID)
self._driver._create_port_rules.assert_called_once_with(
self._FAKE_ID, mock_port['security_group_rules'])
def test_update_port_filter(self):
mock_port = self._get_port()
new_mock_port = self._get_port()
new_mock_port['id'] += '2'
new_mock_port['security_group_rules'][0]['ethertype'] += "2"
self._driver._security_ports[mock_port['device']] = mock_port
self._driver._create_port_rules = mock.MagicMock()
self._driver._remove_port_rules = mock.MagicMock()
self._driver.update_port_filter(new_mock_port)
self._driver._remove_port_rules.assert_called_once_with(
mock_port['id'], mock_port['security_group_rules'])
self._driver._create_port_rules.assert_called_once_with(
new_mock_port['id'], new_mock_port['security_group_rules'])
self.assertEqual(new_mock_port,
self._driver._security_ports[new_mock_port['device']])
@mock.patch('neutron.plugins.hyperv.agent.security_groups_driver'
'.HyperVSecurityGroupsDriver.prepare_port_filter')
def test_update_port_filter_new_port(self, mock_method):
mock_port = self._get_port()
self._driver.prepare_port_filter = mock.MagicMock()
self._driver.update_port_filter(mock_port)
self._driver.prepare_port_filter.assert_called_once_with(mock_port)
def test_remove_port_filter(self):
mock_port = self._get_port()
self._driver._security_ports[mock_port['device']] = mock_port
self._driver.remove_port_filter(mock_port)
self.assertFalse(mock_port['device'] in self._driver._security_ports)
def test_create_port_rules_exception(self):
fake_rule = self._create_security_rule()
self._driver._utils.create_security_rule.side_effect = Exception(
'Generated Exception for testing.')
self._driver._create_port_rules(self._FAKE_ID, [fake_rule])
def test_create_param_map(self):
fake_rule = self._create_security_rule()
self._driver._get_rule_remote_address = mock.MagicMock(
return_value=self._FAKE_SOURCE_IP_PREFIX)
actual = self._driver._create_param_map(fake_rule)
expected = {
'direction': self._driver._ACL_PROP_MAP[
'direction'][self._FAKE_DIRECTION],
'acl_type': self._driver._ACL_PROP_MAP[
'ethertype'][self._FAKE_ETHERTYPE],
'local_port': '%s-%s' % (self._FAKE_PORT_MIN, self._FAKE_PORT_MAX),
'protocol': self._driver._ACL_PROP_MAP['default'],
'remote_address': self._FAKE_SOURCE_IP_PREFIX
}
self.assertEqual(expected, actual)
@mock.patch('neutron.plugins.hyperv.agent.security_groups_driver'
'.HyperVSecurityGroupsDriver._create_param_map')
def test_create_port_rules(self, mock_method):
fake_rule = self._create_security_rule()
mock_method.return_value = {
self._FAKE_PARAM_NAME: self._FAKE_PARAM_VALUE}
self._driver._create_port_rules(self._FAKE_ID, [fake_rule])
self._driver._utils.create_security_rule.assert_called_once_with(
self._FAKE_ID, fake_param_name=self._FAKE_PARAM_VALUE)
def test_convert_any_address_to_same_ingress(self):
rule = self._create_security_rule()
actual = self._driver._get_rule_remote_address(rule)
self.assertEqual(self._FAKE_SOURCE_IP_PREFIX, actual)
def test_convert_any_address_to_same_egress(self):
rule = self._create_security_rule()
rule['direction'] += '2'
actual = self._driver._get_rule_remote_address(rule)
self.assertEqual(self._FAKE_DEST_IP_PREFIX, actual)
def test_convert_any_address_to_ipv4(self):
rule = self._create_security_rule()
del rule['source_ip_prefix']
actual = self._driver._get_rule_remote_address(rule)
self.assertEqual(self._driver._ACL_PROP_MAP['address_default']['IPv4'],
actual)
def test_convert_any_address_to_ipv6(self):
rule = self._create_security_rule()
del rule['source_ip_prefix']
rule['ethertype'] = self._FAKE_ETHERTYPE_IPV6
actual = self._driver._get_rule_remote_address(rule)
self.assertEqual(self._driver._ACL_PROP_MAP['address_default']['IPv6'],
actual)
def test_get_rule_protocol_icmp(self):
self._test_get_rule_protocol(
'icmp', self._driver._ACL_PROP_MAP['protocol']['icmp'])
def test_get_rule_protocol_no_icmp(self):
self._test_get_rule_protocol('tcp', 'tcp')
def _test_get_rule_protocol(self, protocol, expected):
rule = self._create_security_rule()
rule['protocol'] = protocol
actual = self._driver._get_rule_protocol(rule)
self.assertEqual(expected, actual)
def _get_port(self):
return {
'device': self._FAKE_DEVICE,
'id': self._FAKE_ID,
'security_group_rules': [self._create_security_rule()]
}
def _create_security_rule(self):
return {
'direction': self._FAKE_DIRECTION,
'ethertype': self._FAKE_ETHERTYPE,
'dest_ip_prefix': self._FAKE_DEST_IP_PREFIX,
'source_ip_prefix': self._FAKE_SOURCE_IP_PREFIX,
'port_range_min': self._FAKE_PORT_MIN,
'port_range_max': self._FAKE_PORT_MAX
}

View File

@ -1,51 +0,0 @@
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for the Hyper-V utils factory.
"""
import mock
from oslo_config import cfg
from neutron.plugins.hyperv.agent import utils
from neutron.plugins.hyperv.agent import utilsfactory
from neutron.plugins.hyperv.agent import utilsv2
from neutron.tests import base
CONF = cfg.CONF
class TestHyperVUtilsFactory(base.BaseTestCase):
def test_get_hypervutils_v2_r2(self):
self._test_returned_class(utilsv2.HyperVUtilsV2R2, True, '6.3.0')
def test_get_hypervutils_v2(self):
self._test_returned_class(utilsv2.HyperVUtilsV2, False, '6.2.0')
def test_get_hypervutils_v1_old_version(self):
self._test_returned_class(utils.HyperVUtils, False, '6.1.0')
def test_get_hypervutils_v1_forced(self):
self._test_returned_class(utils.HyperVUtils, True, '6.2.0')
def _test_returned_class(self, expected_class, force_v1, os_version):
CONF.hyperv.force_hyperv_utils_v1 = force_v1
utilsfactory._get_windows_version = mock.MagicMock(
return_value=os_version)
actual_class = type(utilsfactory.get_hypervutils())
self.assertEqual(actual_class, expected_class)

View File

@ -1,516 +0,0 @@
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for the Hyper-V utils V2.
"""
import mock
from neutron.plugins.hyperv.agent import utils
from neutron.plugins.hyperv.agent import utilsv2
from neutron.tests import base
class TestHyperVUtilsV2(base.BaseTestCase):
_FAKE_VSWITCH_NAME = "fake_vswitch_name"
_FAKE_PORT_NAME = "fake_port_name"
_FAKE_JOB_PATH = 'fake_job_path'
_FAKE_RET_VAL = 0
_FAKE_VM_PATH = "fake_vm_path"
_FAKE_RES_DATA = "fake_res_data"
_FAKE_RES_PATH = "fake_res_path"
_FAKE_VSWITCH = "fake_vswitch"
_FAKE_VLAN_ID = "fake_vlan_id"
_FAKE_CLASS_NAME = "fake_class_name"
_FAKE_ELEMENT_NAME = "fake_element_name"
_FAKE_HYPERV_VM_STATE = 'fake_hyperv_state'
_FAKE_ACL_ACT = 'fake_acl_action'
_FAKE_ACL_DIR = 'fake_acl_dir'
_FAKE_ACL_TYPE = 'fake_acl_type'
_FAKE_LOCAL_PORT = 'fake_local_port'
_FAKE_PROTOCOL = 'fake_port_protocol'
_FAKE_REMOTE_ADDR = '0.0.0.0/0'
_FAKE_WEIGHT = 'fake_weight'
def setUp(self):
super(TestHyperVUtilsV2, self).setUp()
self._utils = utilsv2.HyperVUtilsV2()
self._utils._wmi_conn = mock.MagicMock()
def test_connect_vnic_to_vswitch_found(self):
self._test_connect_vnic_to_vswitch(True)
def test_connect_vnic_to_vswitch_not_found(self):
self._test_connect_vnic_to_vswitch(False)
def _test_connect_vnic_to_vswitch(self, found):
self._utils._get_vnic_settings = mock.MagicMock()
if not found:
mock_vm = mock.MagicMock()
self._utils._get_vm_from_res_setting_data = mock.MagicMock(
return_value=mock_vm)
self._utils._add_virt_resource = mock.MagicMock()
else:
self._utils._modify_virt_resource = mock.MagicMock()
self._utils._get_vswitch = mock.MagicMock()
self._utils._get_switch_port_allocation = mock.MagicMock()
mock_port = mock.MagicMock()
self._utils._get_switch_port_allocation.return_value = (mock_port,
found)
self._utils.connect_vnic_to_vswitch(self._FAKE_VSWITCH_NAME,
self._FAKE_PORT_NAME)
if not found:
self._utils._add_virt_resource.assert_called_with(mock_vm,
mock_port)
else:
self._utils._modify_virt_resource.assert_called_with(mock_port)
def test_add_virt_resource(self):
self._test_virt_method('AddResourceSettings', 3, '_add_virt_resource',
True, self._FAKE_VM_PATH, [self._FAKE_RES_DATA])
def test_add_virt_feature(self):
self._test_virt_method('AddFeatureSettings', 3, '_add_virt_feature',
True, self._FAKE_VM_PATH, [self._FAKE_RES_DATA])
def test_modify_virt_resource(self):
self._test_virt_method('ModifyResourceSettings', 3,
'_modify_virt_resource', False,
ResourceSettings=[self._FAKE_RES_DATA])
def test_remove_virt_resource(self):
self._test_virt_method('RemoveResourceSettings', 2,
'_remove_virt_resource', False,
ResourceSettings=[self._FAKE_RES_PATH])
def test_remove_virt_feature(self):
self._test_virt_method('RemoveFeatureSettings', 2,
'_remove_virt_feature', False,
FeatureSettings=[self._FAKE_RES_PATH])
def _test_virt_method(self, vsms_method_name, return_count,
utils_method_name, with_mock_vm, *args, **kwargs):
mock_svc = self._utils._conn.Msvm_VirtualSystemManagementService()[0]
vsms_method = getattr(mock_svc, vsms_method_name)
mock_rsd = self._mock_vsms_method(vsms_method, return_count)
if with_mock_vm:
mock_vm = mock.MagicMock()
mock_vm.path_.return_value = self._FAKE_VM_PATH
getattr(self._utils, utils_method_name)(mock_vm, mock_rsd)
else:
getattr(self._utils, utils_method_name)(mock_rsd)
if args:
vsms_method.assert_called_once_with(*args)
else:
vsms_method.assert_called_once_with(**kwargs)
def _mock_vsms_method(self, vsms_method, return_count):
args = None
if return_count == 3:
args = (self._FAKE_JOB_PATH, mock.MagicMock(), self._FAKE_RET_VAL)
else:
args = (self._FAKE_JOB_PATH, self._FAKE_RET_VAL)
vsms_method.return_value = args
mock_res_setting_data = mock.MagicMock()
mock_res_setting_data.GetText_.return_value = self._FAKE_RES_DATA
mock_res_setting_data.path_.return_value = self._FAKE_RES_PATH
self._utils._check_job_status = mock.MagicMock()
return mock_res_setting_data
def test_disconnect_switch_port_delete_port(self):
self._test_disconnect_switch_port(True)
def test_disconnect_switch_port_modify_port(self):
self._test_disconnect_switch_port(False)
def _test_disconnect_switch_port(self, delete_port):
self._utils._get_switch_port_allocation = mock.MagicMock()
mock_sw_port = mock.MagicMock()
self._utils._get_switch_port_allocation.return_value = (mock_sw_port,
True)
if delete_port:
self._utils._remove_virt_resource = mock.MagicMock()
else:
self._utils._modify_virt_resource = mock.MagicMock()
self._utils.disconnect_switch_port(self._FAKE_VSWITCH_NAME,
self._FAKE_PORT_NAME,
delete_port)
if delete_port:
self._utils._remove_virt_resource.assert_called_with(mock_sw_port)
else:
self._utils._modify_virt_resource.assert_called_with(mock_sw_port)
def test_get_vswitch(self):
self._utils._conn.Msvm_VirtualEthernetSwitch.return_value = [
self._FAKE_VSWITCH]
vswitch = self._utils._get_vswitch(self._FAKE_VSWITCH_NAME)
self.assertEqual(self._FAKE_VSWITCH, vswitch)
def test_get_vswitch_not_found(self):
self._utils._conn.Msvm_VirtualEthernetSwitch.return_value = []
self.assertRaises(utils.HyperVException, self._utils._get_vswitch,
self._FAKE_VSWITCH_NAME)
def test_get_vswitch_external_port(self):
mock_vswitch = mock.MagicMock()
mock_sw_port = mock.MagicMock()
mock_vswitch.associators.return_value = [mock_sw_port]
mock_le = mock_sw_port.associators.return_value
mock_le.__len__.return_value = 1
mock_le1 = mock_le[0].associators.return_value
mock_le1.__len__.return_value = 1
vswitch_port = self._utils._get_vswitch_external_port(mock_vswitch)
self.assertEqual(mock_sw_port, vswitch_port)
def test_set_vswitch_port_vlan_id(self):
mock_port_alloc = mock.MagicMock()
self._utils._get_switch_port_allocation = mock.MagicMock(return_value=(
mock_port_alloc, True))
self._utils._get_vlan_setting_data_from_port_alloc = mock.MagicMock()
mock_svc = self._utils._conn.Msvm_VirtualSystemManagementService()[0]
mock_svc.RemoveFeatureSettings.return_value = (self._FAKE_JOB_PATH,
self._FAKE_RET_VAL)
mock_vlan_settings = mock.MagicMock()
self._utils._get_vlan_setting_data = mock.MagicMock(return_value=(
mock_vlan_settings, True))
mock_svc.AddFeatureSettings.return_value = (self._FAKE_JOB_PATH,
None,
self._FAKE_RET_VAL)
self._utils.set_vswitch_port_vlan_id(self._FAKE_VLAN_ID,
self._FAKE_PORT_NAME)
self.assertTrue(mock_svc.RemoveFeatureSettings.called)
self.assertTrue(mock_svc.AddFeatureSettings.called)
def test_get_setting_data(self):
self._utils._get_first_item = mock.MagicMock(return_value=None)
mock_data = mock.MagicMock()
self._utils._get_default_setting_data = mock.MagicMock(
return_value=mock_data)
ret_val = self._utils._get_setting_data(self._FAKE_CLASS_NAME,
self._FAKE_ELEMENT_NAME,
True)
self.assertEqual(ret_val, (mock_data, False))
def test_enable_port_metrics_collection(self):
mock_port = mock.MagicMock()
self._utils._get_switch_port_allocation = mock.MagicMock(return_value=(
mock_port, True))
mock_acl = mock.MagicMock()
with mock.patch.multiple(
self._utils,
_get_default_setting_data=mock.MagicMock(return_value=mock_acl),
_add_virt_feature=mock.MagicMock()):
self._utils.enable_port_metrics_collection(self._FAKE_PORT_NAME)
self.assertEqual(4, len(self._utils._add_virt_feature.mock_calls))
self._utils._add_virt_feature.assert_called_with(
mock_port, mock_acl)
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._get_switch_port_allocation')
def test_enable_control_metrics_ok(self, mock_get_port_allocation):
mock_metrics_svc = self._utils._conn.Msvm_MetricService()[0]
mock_metrics_def_source = self._utils._conn.CIM_BaseMetricDefinition
mock_metric_def = mock.MagicMock()
mock_port = mock.MagicMock()
mock_get_port_allocation.return_value = (mock_port, True)
mock_metrics_def_source.return_value = [mock_metric_def]
m_call = mock.call(Subject=mock_port.path_.return_value,
Definition=mock_metric_def.path_.return_value,
MetricCollectionEnabled=self._utils._METRIC_ENABLED)
self._utils.enable_control_metrics(self._FAKE_PORT_NAME)
mock_metrics_svc.ControlMetrics.assert_has_calls([m_call, m_call])
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._get_switch_port_allocation')
def test_enable_control_metrics_no_port(self, mock_get_port_allocation):
mock_metrics_svc = self._utils._conn.Msvm_MetricService()[0]
mock_get_port_allocation.return_value = (None, False)
self._utils.enable_control_metrics(self._FAKE_PORT_NAME)
self.assertEqual(0, mock_metrics_svc.ControlMetrics.call_count)
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._get_switch_port_allocation')
def test_enable_control_metrics_no_def(self, mock_get_port_allocation):
mock_metrics_svc = self._utils._conn.Msvm_MetricService()[0]
mock_metrics_def_source = self._utils._conn.CIM_BaseMetricDefinition
mock_port = mock.MagicMock()
mock_get_port_allocation.return_value = (mock_port, True)
mock_metrics_def_source.return_value = None
self._utils.enable_control_metrics(self._FAKE_PORT_NAME)
self.assertEqual(0, mock_metrics_svc.ControlMetrics.call_count)
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._is_port_vm_started')
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._get_switch_port_allocation')
def test_can_enable_control_metrics_true(self, mock_get, mock_is_started):
mock_acl = mock.MagicMock()
mock_acl.Action = self._utils._ACL_ACTION_METER
self._test_can_enable_control_metrics(mock_get, mock_is_started,
[mock_acl, mock_acl], True)
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._is_port_vm_started')
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._get_switch_port_allocation')
def test_can_enable_control_metrics_false(self, mock_get, mock_is_started):
self._test_can_enable_control_metrics(mock_get, mock_is_started, [],
False)
def _test_can_enable_control_metrics(self, mock_get_port, mock_vm_started,
acls, expected_result):
mock_port = mock.MagicMock()
mock_acl = mock.MagicMock()
mock_acl.Action = self._utils._ACL_ACTION_METER
mock_port.associators.return_value = acls
mock_get_port.return_value = (mock_port, True)
mock_vm_started.return_value = True
result = self._utils.can_enable_control_metrics(self._FAKE_PORT_NAME)
self.assertEqual(expected_result, result)
def test_is_port_vm_started_true(self):
self._test_is_port_vm_started(self._utils._HYPERV_VM_STATE_ENABLED,
True)
def test_is_port_vm_started_false(self):
self._test_is_port_vm_started(self._FAKE_HYPERV_VM_STATE, False)
def _test_is_port_vm_started(self, vm_state, expected_result):
mock_svc = self._utils._conn.Msvm_VirtualSystemManagementService()[0]
mock_port = mock.MagicMock()
mock_vmsettings = mock.MagicMock()
mock_summary = mock.MagicMock()
mock_summary.EnabledState = vm_state
mock_vmsettings.path_.return_value = self._FAKE_RES_PATH
mock_port.associators.return_value = [mock_vmsettings]
mock_svc.GetSummaryInformation.return_value = (self._FAKE_RET_VAL,
[mock_summary])
result = self._utils._is_port_vm_started(mock_port)
self.assertEqual(expected_result, result)
mock_svc.GetSummaryInformation.assert_called_once_with(
[self._utils._VM_SUMMARY_ENABLED_STATE],
[self._FAKE_RES_PATH])
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._remove_virt_feature')
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._bind_security_rule')
def test_create_default_reject_all_rules(self, mock_bind, mock_remove):
(m_port, m_acl) = self._setup_security_rule_test()
m_acl.Action = self._utils._ACL_ACTION_DENY
self._utils.create_default_reject_all_rules(self._FAKE_PORT_NAME)
calls = []
ipv4_pair = (self._utils._ACL_TYPE_IPV4, self._utils._IPV4_ANY)
ipv6_pair = (self._utils._ACL_TYPE_IPV6, self._utils._IPV6_ANY)
for direction in [self._utils._ACL_DIR_IN, self._utils._ACL_DIR_OUT]:
for acl_type, address in [ipv4_pair, ipv6_pair]:
for protocol in [self._utils._TCP_PROTOCOL,
self._utils._UDP_PROTOCOL,
self._utils._ICMP_PROTOCOL]:
calls.append(mock.call(m_port, direction, acl_type,
self._utils._ACL_ACTION_DENY,
self._utils._ACL_DEFAULT,
protocol, address, mock.ANY))
self._utils._remove_virt_feature.assert_called_once_with(m_acl)
self._utils._bind_security_rule.assert_has_calls(calls)
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._remove_virt_feature')
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._bind_security_rule')
def test_create_default_reject_all_rules_already_added(self, mock_bind,
mock_remove):
(m_port, m_acl) = self._setup_security_rule_test()
m_acl.Action = self._utils._ACL_ACTION_DENY
m_port.associators.return_value = [
m_acl] * self._utils._REJECT_ACLS_COUNT
self._utils.create_default_reject_all_rules(self._FAKE_PORT_NAME)
self.assertFalse(self._utils._remove_virt_feature.called)
self.assertFalse(self._utils._bind_security_rule.called)
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._remove_virt_feature')
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._add_virt_feature')
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._create_security_acl')
def test_bind_security_rule(self, mock_create_acl, mock_add, mock_remove):
(m_port, m_acl) = self._setup_security_rule_test()
mock_create_acl.return_value = m_acl
self._utils._bind_security_rule(
m_port, self._FAKE_ACL_DIR, self._FAKE_ACL_TYPE,
self._FAKE_ACL_ACT, self._FAKE_LOCAL_PORT, self._FAKE_PROTOCOL,
self._FAKE_REMOTE_ADDR, self._FAKE_WEIGHT)
self._utils._add_virt_feature.assert_called_once_with(m_port, m_acl)
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._remove_virt_feature')
def test_remove_security_rule(self, mock_remove_feature):
mock_acl = self._setup_security_rule_test()[1]
self._utils.remove_security_rule(
self._FAKE_PORT_NAME, self._FAKE_ACL_DIR, self._FAKE_ACL_TYPE,
self._FAKE_LOCAL_PORT, self._FAKE_PROTOCOL, self._FAKE_REMOTE_ADDR)
self._utils._remove_virt_feature.assert_called_once_with(mock_acl)
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._remove_multiple_virt_features')
def test_remove_all_security_rules(self, mock_remove_feature):
mock_acl = self._setup_security_rule_test()[1]
self._utils.remove_all_security_rules(self._FAKE_PORT_NAME)
self._utils._remove_multiple_virt_features.assert_called_once_with(
[mock_acl])
def _setup_security_rule_test(self):
mock_port = mock.MagicMock()
mock_acl = mock.MagicMock()
mock_port.associators.return_value = [mock_acl]
self._utils._get_switch_port_allocation = mock.MagicMock(return_value=(
mock_port, True))
self._utils._filter_security_acls = mock.MagicMock(
return_value=[mock_acl])
return (mock_port, mock_acl)
def test_filter_acls(self):
mock_acl = mock.MagicMock()
mock_acl.Action = self._FAKE_ACL_ACT
mock_acl.Applicability = self._utils._ACL_APPLICABILITY_LOCAL
mock_acl.Direction = self._FAKE_ACL_DIR
mock_acl.AclType = self._FAKE_ACL_TYPE
mock_acl.RemoteAddress = self._FAKE_REMOTE_ADDR
acls = [mock_acl, mock_acl]
good_acls = self._utils._filter_acls(
acls, self._FAKE_ACL_ACT, self._FAKE_ACL_DIR,
self._FAKE_ACL_TYPE, self._FAKE_REMOTE_ADDR)
bad_acls = self._utils._filter_acls(
acls, self._FAKE_ACL_ACT, self._FAKE_ACL_DIR, self._FAKE_ACL_TYPE)
self.assertEqual(acls, good_acls)
self.assertEqual([], bad_acls)
class TestHyperVUtilsV2R2(base.BaseTestCase):
_FAKE_ACL_ACT = 'fake_acl_action'
_FAKE_ACL_DIR = 'fake_direction'
_FAKE_ACL_TYPE = 'fake_acl_type'
_FAKE_LOCAL_PORT = 'fake_local_port'
_FAKE_PROTOCOL = 'fake_port_protocol'
_FAKE_REMOTE_ADDR = '10.0.0.0/0'
def setUp(self):
super(TestHyperVUtilsV2R2, self).setUp()
self._utils = utilsv2.HyperVUtilsV2R2()
def test_filter_security_acls(self):
self._test_filter_security_acls(
self._FAKE_LOCAL_PORT, self._FAKE_PROTOCOL, self._FAKE_REMOTE_ADDR)
def test_filter_security_acls_default(self):
default = self._utils._ACL_DEFAULT
self._test_filter_security_acls(
default, default, self._FAKE_REMOTE_ADDR)
def _test_filter_security_acls(self, local_port, protocol, remote_addr):
acls = []
default = self._utils._ACL_DEFAULT
for port, proto in [(default, default), (local_port, protocol)]:
mock_acl = mock.MagicMock()
mock_acl.Action = self._utils._ACL_ACTION_ALLOW
mock_acl.Direction = self._FAKE_ACL_DIR
mock_acl.LocalPort = port
mock_acl.Protocol = proto
mock_acl.RemoteIPAddress = remote_addr
acls.append(mock_acl)
right_acls = [a for a in acls if a.LocalPort == local_port]
good_acls = self._utils._filter_security_acls(
acls, mock_acl.Action, self._FAKE_ACL_DIR, self._FAKE_ACL_TYPE,
local_port, protocol, remote_addr)
bad_acls = self._utils._filter_security_acls(
acls, self._FAKE_ACL_ACT, self._FAKE_ACL_DIR, self._FAKE_ACL_TYPE,
local_port, protocol, remote_addr)
self.assertEqual(right_acls, good_acls)
self.assertEqual([], bad_acls)
def test_get_new_weight(self):
mockacl1 = mock.MagicMock()
mockacl1.Weight = self._utils._MAX_WEIGHT - 1
mockacl2 = mock.MagicMock()
mockacl2.Weight = self._utils._MAX_WEIGHT - 3
self.assertEqual(self._utils._MAX_WEIGHT - 2,
self._utils._get_new_weight([mockacl1, mockacl2]))
def test_get_new_weight_no_acls(self):
self.assertEqual(self._utils._MAX_WEIGHT - 1,
self._utils._get_new_weight([]))
def test_get_new_weight_default_acls(self):
mockacl1 = mock.MagicMock()
mockacl1.Weight = self._utils._MAX_WEIGHT - 1
mockacl2 = mock.MagicMock()
mockacl2.Weight = self._utils._MAX_WEIGHT - 2
mockacl2.Action = self._utils._ACL_ACTION_DENY
self.assertEqual(self._utils._MAX_WEIGHT - 2,
self._utils._get_new_weight([mockacl1, mockacl2]))

View File

@ -1,69 +0,0 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import constants
from neutron.extensions import portbindings
from neutron.plugins.ml2.drivers import mech_hyperv
from neutron.tests.unit.ml2 import _test_mech_agent as base
class HypervMechanismBaseTestCase(base.AgentMechanismBaseTestCase):
VIF_TYPE = portbindings.VIF_TYPE_HYPERV
CAP_PORT_FILTER = False
AGENT_TYPE = constants.AGENT_TYPE_HYPERV
GOOD_MAPPINGS = {'fake_physical_network': 'fake_vswitch'}
GOOD_CONFIGS = {'vswitch_mappings': GOOD_MAPPINGS}
BAD_MAPPINGS = {'wrong_physical_network': 'wrong_vswitch'}
BAD_CONFIGS = {'vswitch_mappings': BAD_MAPPINGS}
AGENTS = [{'alive': True,
'configurations': GOOD_CONFIGS,
'host': 'host'}]
AGENTS_DEAD = [{'alive': False,
'configurations': GOOD_CONFIGS,
'host': 'dead_host'}]
AGENTS_BAD = [{'alive': False,
'configurations': GOOD_CONFIGS,
'host': 'bad_host_1'},
{'alive': True,
'configurations': BAD_CONFIGS,
'host': 'bad_host_2'}]
def setUp(self):
super(HypervMechanismBaseTestCase, self).setUp()
self.driver = mech_hyperv.HypervMechanismDriver()
self.driver.initialize()
class HypervMechanismGenericTestCase(HypervMechanismBaseTestCase,
base.AgentMechanismGenericTestCase):
pass
class HypervMechanismLocalTestCase(HypervMechanismBaseTestCase,
base.AgentMechanismLocalTestCase):
pass
class HypervMechanismFlatTestCase(HypervMechanismBaseTestCase,
base.AgentMechanismFlatTestCase):
pass
class HypervMechanismVlanTestCase(HypervMechanismBaseTestCase,
base.AgentMechanismVlanTestCase):
pass

View File

@ -93,7 +93,7 @@ console_scripts =
neutron-db-manage = neutron.db.migration.cli:main
neutron-debug = neutron.debug.shell:main
neutron-dhcp-agent = neutron.cmd.eventlet.agents.dhcp:main
neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
neutron-hyperv-agent = neutron.cmd.eventlet.plugins.hyperv_neutron_agent:main
neutron-keepalived-state-change = neutron.cmd.keepalived_state_change:main
neutron-ibm-agent = neutron.plugins.ibm.agent.sdnve_neutron_agent:main
neutron-l3-agent = neutron.cmd.eventlet.agents.l3:main
@ -168,7 +168,7 @@ neutron.ml2.mechanism_drivers =
test = neutron.tests.unit.ml2.drivers.mechanism_test:TestMechanismDriver
linuxbridge = neutron.plugins.ml2.drivers.mech_linuxbridge:LinuxbridgeMechanismDriver
openvswitch = neutron.plugins.ml2.drivers.mech_openvswitch:OpenvswitchMechanismDriver
hyperv = neutron.plugins.ml2.drivers.mech_hyperv:HypervMechanismDriver
hyperv = neutron.plugins.ml2.drivers.hyperv.mech_hyperv:HypervMechanismDriver
arista = neutron.plugins.ml2.drivers.arista.mechanism_arista:AristaDriver
# Note: ncs and cisco_ncs point to the same driver entrypoint
# TODO: The old name (ncs) can be dropped when it is no longer used