Merge "Adds Hyper-V NVGRE support"

This commit is contained in:
Jenkins
2015-09-10 11:41:25 +00:00
committed by Gerrit Code Review
18 changed files with 1632 additions and 1 deletions

View File

@@ -14,7 +14,12 @@
# under the License.
# Topic for tunnel notifications between the plugin and agent
AGENT_TOPIC = 'q-agent-notifier'
TUNNEL = 'tunnel'
LOOKUP = 'lookup'
UPDATE = 'update'
# Special vlan_id value in ovs_vlan_allocations table indicating flat network
FLAT_VLAN_ID = -1
@@ -23,3 +28,6 @@ TRUNK_ENDPOINT_MODE = 5
TYPE_FLAT = 'flat'
TYPE_LOCAL = 'local'
TYPE_VLAN = 'vlan'
TYPE_NVGRE = 'gre'
IPV4_DEFAULT = '0.0.0.0'

View File

@@ -0,0 +1,66 @@
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from hyperv.neutron import constants
LOG = logging.getLogger(__name__)
def get_topic_name(prefix, table, operation):
"""Create a topic name.
The topic name needs to be synced between the agents.
The agent will send a fanout message to all of the listening agents
so that the agents in turn can perform their updates accordingly.
:param prefix: Common prefix for the agent message queues.
:param table: The table in question (TUNNEL, LOOKUP).
:param operation: The operation that invokes notification (UPDATE)
:returns: The topic name.
"""
return '%s-%s-%s' % (prefix, table, operation)
class AgentNotifierApi(object):
"""Agent side of the OpenVSwitch rpc API."""
def __init__(self, topic, client):
self._client = client
self.topic_tunnel_update = get_topic_name(topic,
constants.TUNNEL,
constants.UPDATE)
self.topic_lookup_update = get_topic_name(topic,
constants.LOOKUP,
constants.UPDATE)
def _fanout_cast(self, context, topic, method, **info):
cctxt = self._client.prepare(topic=topic, fanout=True)
cctxt.cast(context, method, **info)
def tunnel_update(self, context, tunnel_ip, tunnel_type):
self._fanout_cast(context,
self.topic_tunnel_update,
'tunnel_update',
tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)
def lookup_update(self, context, lookup_ip, lookup_details):
self._fanout_cast(context,
self.topic_lookup_update,
'lookup_update',
lookup_ip=lookup_ip,
lookup_details=lookup_details)

View File

@@ -18,13 +18,34 @@ import collections
import re
import time
from oslo_config import cfg
from oslo_log import log as logging
from hyperv.common.i18n import _, _LE, _LI # noqa
from hyperv.neutron import constants
from hyperv.neutron import hyperv_agent_notifier
from hyperv.neutron import nvgre_ops
from hyperv.neutron import utils
from hyperv.neutron import utilsfactory
nvgre_opts = [
cfg.BoolOpt('enable_support',
default=False,
help=_('Enables Hyper-V NVGRE. '
'Requires Windows Server 2012 or above.')),
cfg.IntOpt('provider_vlan_id',
default=0,
help=_('Specifies the VLAN ID of the physical network, required'
' for setting the NVGRE Provider Address.')),
cfg.StrOpt('provider_tunnel_ip',
default=None,
help=_('Specifies the tunnel IP which will be used and '
'reported by this host for NVGRE networks.')),
]
CONF = cfg.CONF
CONF.register_opts(nvgre_opts, "NVGRE")
LOG = logging.getLogger(__name__)
@@ -57,6 +78,13 @@ networking-plugin-hyperv_agent.html
self._port_metric_retries = {}
self.plugin_rpc = None
self.context = None
self.client = None
self.connection = None
self.endpoints = None
self.topic = constants.AGENT_TOPIC
self._nvgre_enabled = False
conf = conf or {}
agent_conf = conf.get('AGENT', {})
@@ -89,8 +117,48 @@ networking-plugin-hyperv_agent.html
vswitch = parts[1].strip()
self._physical_network_mappings[pattern] = vswitch
def _init_nvgre(self):
# if NVGRE is enabled, self._nvgre_ops is required in order to properly
# set the agent state (see get_agent_configrations method).
if not CONF.NVGRE.enable_support:
return
if not CONF.NVGRE.provider_tunnel_ip:
err_msg = _('enable_nvgre_support is set to True, but provider '
'tunnel IP is not configured. Check neutron.conf '
'config file.')
LOG.error(err_msg)
raise utils.HyperVException(msg=err_msg)
self._nvgre_enabled = True
self._nvgre_ops = nvgre_ops.HyperVNvgreOps(
self._physical_network_mappings.values())
self._nvgre_ops.init_notifier(self.context, self.client)
self._nvgre_ops.tunnel_update(self.context,
CONF.NVGRE.provider_tunnel_ip,
constants.TYPE_NVGRE)
# setup Hyper-V Agent Lookup Record update consumer
topic = hyperv_agent_notifier.get_topic_name(
self.topic, constants.LOOKUP, constants.UPDATE)
self.connection.create_consumer(topic, self.endpoints, fanout=True)
# the created consumer is the last connection server.
# need to start it in order for it to consume.
self.connection.servers[-1].start()
def get_agent_configurations(self):
configurations = {'vswitch_mappings': self._physical_network_mappings}
if CONF.NVGRE.enable_support:
configurations['arp_responder_enabled'] = False
configurations['tunneling_ip'] = CONF.NVGRE.provider_tunnel_ip
configurations['devices'] = 1
configurations['l2_population'] = False
configurations['tunnel_types'] = [constants.TYPE_NVGRE]
configurations['enable_distributed_routing'] = False
configurations['bridge_mappings'] = {}
return configurations
def _get_vswitch_for_physical_network(self, phys_network_name):
@@ -135,6 +203,20 @@ networking-plugin-hyperv_agent.html
network_type, physical_network,
segmentation_id, port['admin_state_up'])
def tunnel_update(self, context, **kwargs):
LOG.info(_LI('tunnel_update received: kwargs: %s'), kwargs)
tunnel_ip = kwargs.get('tunnel_ip')
if tunnel_ip == CONF.NVGRE.provider_tunnel_ip:
# the notification should be ignored if it originates from this
# node.
return
tunnel_type = kwargs.get('tunnel_type')
self._nvgre_ops.tunnel_update(context, tunnel_ip, tunnel_type)
def lookup_update(self, context, **kwargs):
self._nvgre_ops.lookup_update(kwargs)
def _get_vswitch_name(self, network_type, physical_network):
if network_type != constants.TYPE_LOCAL:
vswitch_name = self._get_vswitch_for_physical_network(
@@ -153,6 +235,9 @@ networking-plugin-hyperv_agent.html
if network_type == constants.TYPE_VLAN:
self._utils.set_switch_external_port_trunk_vlan(
vswitch_name, segmentation_id, constants.TRUNK_ENDPOINT_MODE)
elif network_type == constants.TYPE_NVGRE and self._nvgre_enabled:
self._nvgre_ops.bind_nvgre_network(
segmentation_id, net_uuid, vswitch_name)
elif network_type == constants.TYPE_FLAT:
# Nothing to do
pass
@@ -201,6 +286,9 @@ networking-plugin-hyperv_agent.html
self._utils.set_vswitch_port_vlan_id(
segmentation_id,
port_id)
elif network_type == constants.TYPE_NVGRE and self._nvgre_enabled:
self._nvgre_ops.bind_nvgre_port(
segmentation_id, map['vswitch_name'], port_id)
elif network_type == constants.TYPE_FLAT:
# Nothing to do
pass
@@ -336,6 +424,9 @@ networking-plugin-hyperv_agent.html
return (resync_a | resync_b)
def daemon_loop(self):
# init NVGRE after the RPC connection and context is created.
self._init_nvgre()
sync = True
ports = set()
@@ -356,6 +447,8 @@ networking-plugin-hyperv_agent.html
sync = self._process_network_ports(port_info)
ports = port_info['current']
if self._nvgre_enabled:
self._nvgre_ops.refresh_nvgre_records()
self._port_enable_control_metrics()
except Exception:
LOG.exception(_LE("Error in agent event loop"))

View File

@@ -29,7 +29,13 @@ class HypervMechanismDriver(object):
"""
def get_allowed_network_types(self, agent=None):
return [constants.TYPE_LOCAL, constants.TYPE_FLAT, constants.TYPE_VLAN]
network_types = [constants.TYPE_LOCAL, constants.TYPE_FLAT,
constants.TYPE_VLAN]
if agent is not None:
tunnel_types = agent.get('configurations', {}).get('tunnel_types')
if tunnel_types:
network_types.extend(tunnel_types)
return network_types
def get_mappings(self, agent):
return agent['configurations'].get('vswitch_mappings', {})

View File

@@ -0,0 +1,134 @@
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutronclient.v2_0 import client as clientv20
from oslo_config import cfg
from oslo_log import log as logging
from hyperv.common.i18n import _LW, _LE # noqa
from hyperv.neutron import constants
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
neutron_opts = [
cfg.StrOpt('url',
default='http://127.0.0.1:9696',
help='URL for connecting to neutron'),
cfg.IntOpt('url_timeout',
default=30,
help='timeout value for connecting to neutron in seconds'),
cfg.StrOpt('admin_username',
help='username for connecting to neutron in admin context'),
cfg.StrOpt('admin_password',
help='password for connecting to neutron in admin context',
secret=True),
cfg.StrOpt('admin_tenant_name',
help='tenant name for connecting to neutron in admin context'),
cfg.StrOpt('admin_auth_url',
default='http://localhost:5000/v2.0',
help='auth url for connecting to neutron in admin context'),
cfg.StrOpt('auth_strategy',
default='keystone',
help='auth strategy for connecting to neutron in admin context')
]
CONF.register_opts(neutron_opts, 'neutron')
class NeutronAPIClient(object):
def __init__(self):
self._init_client()
def _init_client(self, token=None):
params = {
'endpoint_url': CONF.neutron.url,
'timeout': CONF.neutron.url_timeout,
'insecure': True,
'ca_cert': None,
}
if token:
params['token'] = token
params['auth_strategy'] = None
else:
params['username'] = CONF.neutron.admin_username
params['tenant_name'] = CONF.neutron.admin_tenant_name
params['password'] = CONF.neutron.admin_password
params['auth_url'] = CONF.neutron.admin_auth_url
params['auth_strategy'] = CONF.neutron.auth_strategy
self._client = clientv20.Client(**params)
def get_network_subnets(self, network_id):
try:
net = self._client.show_network(network_id)
return net['network']['subnets']
except Exception as ex:
LOG.error(_LE("Could not retrieve network %(network_id)s . Error: "
"%(ex)s"), {'network_id': network_id, 'ex': ex})
return []
def get_network_subnet_cidr_and_gateway(self, subnet_id):
try:
subnet = self._client.show_subnet(subnet_id)['subnet']
return (str(subnet['cidr']), str(subnet['gateway_ip']))
except Exception as ex:
LOG.error(_LE("Could not retrieve subnet %(subnet_id)s . Error: "
"%(ex)s: "), {'subnet_id': subnet_id, 'ex': ex})
return None, None
def get_port_ip_address(self, port_id):
try:
port = self._client.show_port(port_id)
fixed_ips = port['port']['fixed_ips'][0]
return fixed_ips['ip_address']
except Exception as ex:
LOG.error(_LE("Could not retrieve port %(port_id)s . Error: "
"%(ex)s"), {'port_id': port_id, 'ex': ex})
return None
def get_tunneling_agents(self):
try:
agents = self._client.list_agents()
tunneling_agents = [
a for a in agents['agents'] if constants.TYPE_NVGRE in
a.get('configurations', {}).get('tunnel_types', [])]
tunneling_ip_agents = [
a for a in tunneling_agents if
a.get('configurations', {}).get('tunneling_ip')]
if len(tunneling_ip_agents) < len(tunneling_agents):
LOG.warning(_LW('Some agents have NVGRE tunneling enabled, but'
' do not provide tunneling_ip. Ignoring those '
'agents.'))
return dict([(a['host'], a['configurations']['tunneling_ip'])
for a in tunneling_ip_agents])
except Exception as ex:
LOG.error(_LE("Could not get tunneling agents. Error: %s"), ex)
return {}
def get_network_ports(self, **kwargs):
try:
return self._client.list_ports(**kwargs)['ports']
except Exception as ex:
LOG.error(_LE("Exception caught: %s"), ex)
return []

176
hyperv/neutron/nvgre_ops.py Normal file
View File

@@ -0,0 +1,176 @@
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from hyperv.common.i18n import _LI, _LW, _LE # noqa
from hyperv.neutron import constants
from hyperv.neutron import hyperv_agent_notifier
from hyperv.neutron import neutron_client
from hyperv.neutron import utils_nvgre
from hyperv.neutron import utilsfactory
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class HyperVNvgreOps(object):
def __init__(self, physical_networks):
self.topic = constants.AGENT_TOPIC
self._vswitch_ips = {}
self._tunneling_agents = {}
self._nvgre_ports = []
self._network_vsids = {}
self._hyperv_utils = utilsfactory.get_hypervutils()
self._nvgre_utils = utils_nvgre.NvgreUtils()
self._n_client = neutron_client.NeutronAPIClient()
self._init_nvgre(physical_networks)
def init_notifier(self, context, rpc_client):
self.context = context
self._notifier = hyperv_agent_notifier.AgentNotifierApi(
self.topic, rpc_client)
def _init_nvgre(self, physical_networks):
for network in physical_networks:
LOG.info(_LI("Adding provider route and address for network: %s"),
network)
self._nvgre_utils.create_provider_route(network)
self._nvgre_utils.create_provider_address(
network, CONF.NVGRE.provider_vlan_id)
ip_addr, length = self._nvgre_utils.get_network_iface_ip(network)
self._vswitch_ips[network] = ip_addr
def _refresh_tunneling_agents(self):
self._tunneling_agents.update(self._n_client.get_tunneling_agents())
def lookup_update(self, kwargs):
lookup_ip = kwargs.get('lookup_ip')
lookup_details = kwargs.get('lookup_details')
LOG.info(_LI("Lookup Received: %(lookup_ip)s, %(lookup_details)s"),
{'lookup_ip': lookup_ip, 'lookup_details': lookup_details})
if not lookup_ip or not lookup_details:
return
self._register_lookup_record(lookup_ip,
lookup_details['customer_addr'],
lookup_details['mac_addr'],
lookup_details['customer_vsid'])
def tunnel_update(self, context, tunnel_ip, tunnel_type):
if tunnel_type != constants.TYPE_NVGRE:
return
self._notifier.tunnel_update(context, CONF.NVGRE.provider_tunnel_ip,
tunnel_type)
def _register_lookup_record(self, prov_addr, cust_addr, mac_addr, vsid):
LOG.info(_LI('Creating LookupRecord: VSID: %(vsid)s MAC: %(mac_addr)s '
'Customer IP: %(cust_addr)s Provider IP: %(prov_addr)s'),
dict(vsid=vsid,
mac_addr=mac_addr,
cust_addr=cust_addr,
prov_addr=prov_addr))
self._nvgre_utils.create_lookup_record(
prov_addr, cust_addr, mac_addr, vsid)
def bind_nvgre_port(self, segmentation_id, network_name, port_id):
mac_addr = self._hyperv_utils.get_vnic_mac_address(port_id)
provider_addr = self._nvgre_utils.get_network_iface_ip(network_name)[0]
customer_addr = self._n_client.get_port_ip_address(port_id)
if not provider_addr or not customer_addr:
LOG.warning(_LW('Cannot bind NVGRE port. Could not determine '
'provider address (%(prov_addr)s) or customer '
'address (%(cust_addr)s).'),
{'prov_addr': provider_addr,
'cust_addr': customer_addr})
return
LOG.info(_LI('Binding VirtualSubnetID %(segmentation_id)s '
'to switch port %(port_id)s'),
dict(segmentation_id=segmentation_id, port_id=port_id))
self._hyperv_utils.set_vswitch_port_vsid(segmentation_id, port_id)
# normal lookup record.
self._register_lookup_record(
provider_addr, customer_addr, mac_addr, segmentation_id)
# lookup record for dhcp requests.
self._register_lookup_record(
self._vswitch_ips[network_name], constants.IPV4_DEFAULT,
mac_addr, segmentation_id)
LOG.info('Fanning out LookupRecord...')
self._notifier.lookup_update(self.context,
provider_addr,
{'customer_addr': customer_addr,
'mac_addr': mac_addr,
'customer_vsid': segmentation_id})
def bind_nvgre_network(self, segmentation_id, net_uuid, vswitch_name):
subnets = self._n_client.get_network_subnets(net_uuid)
if len(subnets) > 1:
LOG.warning(_LW("Multiple subnets in the same network is not "
"supported."))
subnet = subnets[0]
try:
cidr, gw = self._n_client.get_network_subnet_cidr_and_gateway(
subnet)
self._nvgre_utils.create_customer_routes(
segmentation_id, vswitch_name, cidr, gw)
except Exception as ex:
LOG.error(_LE("Exception caught: %s"), ex)
self._network_vsids[net_uuid] = segmentation_id
self.refresh_nvgre_records(network_id=net_uuid)
self._notifier.tunnel_update(
self.context, CONF.NVGRE.provider_tunnel_ip, segmentation_id)
def refresh_nvgre_records(self, **kwargs):
self._refresh_tunneling_agents()
ports = self._n_client.get_network_ports(**kwargs)
# process ports that were not processed yet.
# process ports that are bound to tunneling_agents.
ports = [p for p in ports if p['id'] not in self._nvgre_ports and
p['binding:host_id'] in self._tunneling_agents and
p['network_id'] in self._network_vsids.keys()]
for port in ports:
tunneling_ip = self._tunneling_agents[port['binding:host_id']]
customer_addr = port['fixed_ips'][0]['ip_address']
mac_addr = port['mac_address'].replace(':', '')
segmentation_id = self._network_vsids[port['network_id']]
try:
self._register_lookup_record(
tunneling_ip, customer_addr, mac_addr, segmentation_id)
self._nvgre_ports.append(port['id'])
except Exception as ex:
LOG.error(_LE("Exception while adding lookup_record: %(ex)s. "
"VSID: %(vsid)s MAC: %(mac_address)s Customer "
"IP:%(cust_addr)s Provider IP: %(prov_addr)s"),
dict(ex=ex,
vsid=segmentation_id,
mac_address=mac_addr,
cust_addr=customer_addr,
prov_addr=tunneling_ip))

View File

@@ -95,6 +95,10 @@ class HyperVUtils(object):
for p in self._conn.Msvm_SyntheticEthernetPortSettingData()
if p.ElementName is not None)
def get_vnic_mac_address(self, switch_port_name):
vnic = self._get_vnic_settings(switch_port_name)
return vnic.Address
def _get_vnic_settings(self, vnic_name):
vnic_settings = self._conn.Msvm_SyntheticEthernetPortSettingData(
ElementName=vnic_name)

View File

@@ -0,0 +1,219 @@
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import uuid
from oslo_config import cfg
from oslo_log import log as logging
from hyperv.common.i18n import _, _LI, _LW, _LE # noqa
from hyperv.neutron import constants
from hyperv.neutron import utilsv2
# Check needed for unit testing on Unix
if sys.platform == 'win32':
import wmi
agent_opts = [
cfg.StrOpt('neutron_metadata_address',
default='169.254.169.254',
help=_('Specifies the address which will serve the metadata for'
' the instance.')),
]
CONF = cfg.CONF
CONF.register_opts(agent_opts, "AGENT")
LOG = logging.getLogger(__name__)
class NvgreUtils(object):
_HYPERV_VIRT_ADAPTER = 'Hyper-V Virtual Ethernet Adapter'
_IPV4_ADDRESS_FAMILY = 2
_WNV_BIND_NAME = 'Wnv'
_TRANSLATE_NAT = 0
_TRANSLATE_ENCAP = 1
_LOOKUP_RECORD_TYPE_STATIC = 0
_LOOKUP_RECORD_TYPE_L2_ONLY = 3
_STDCIMV2_NAMESPACE = '//./root/StandardCimv2'
def __init__(self):
super(NvgreUtils, self).__init__()
self._utils = utilsv2.HyperVUtilsV2()
self._net_if_indexes = {}
if sys.platform == 'win32':
self._scimv2 = wmi.WMI(moniker=self._STDCIMV2_NAMESPACE)
def create_provider_address(self, network_name, provider_vlan_id):
iface_index = self._get_network_iface_index(network_name)
(provider_addr, prefix_len) = self.get_network_iface_ip(network_name)
if iface_index is None or provider_addr is None:
return None
provider = (
self._scimv2.MSFT_NetVirtualizationProviderAddressSettingData(
ProviderAddress=provider_addr))
if provider:
if (provider[0].VlanID == provider_vlan_id and
provider[0].InterfaceIndex == iface_index):
# ProviderAddress already exists.
return
# ProviderAddress exists, but with different VlanID or iface index.
provider[0].Delete_()
self._create_new_object(
self._scimv2.MSFT_NetVirtualizationProviderAddressSettingData,
ProviderAddress=provider_addr,
VlanID=provider_vlan_id,
InterfaceIndex=iface_index,
PrefixLength=prefix_len)
def create_provider_route(self, network_name):
iface_index = self._get_network_iface_index(network_name)
if iface_index is None:
return
routes = self._scimv2.MSFT_NetVirtualizationProviderRouteSettingData(
InterfaceIndex=iface_index, NextHop=constants.IPV4_DEFAULT)
if not routes:
self._create_new_object(
self._scimv2.MSFT_NetVirtualizationProviderRouteSettingData,
InterfaceIndex=iface_index,
DestinationPrefix='%s/0' % constants.IPV4_DEFAULT,
NextHop=constants.IPV4_DEFAULT)
def create_customer_routes(self, vsid, network_name, dest_prefix, gateway):
cust_route_string = network_name + dest_prefix + str(vsid)
rdid_uuid = uuid.uuid5(uuid.NAMESPACE_X500, cust_route_string)
rdid_uuid = str(rdid_uuid)
routes = self._scimv2.MSFT_NetVirtualizationCustomerRouteSettingData(
VirtualSubnetID=vsid)
for route in routes:
route.Delete_()
# TODO(claudiub): the logic should be moved to nvgre_ops. This should
# create only one router per call.
self._create_cust_route(
vsid, dest_prefix, constants.IPV4_DEFAULT, rdid_uuid)
if not gateway:
LOG.info(_LI('Subnet does not have gateway configured. Skipping.'))
return
if gateway.split('.')[-1] == '1':
LOG.error(_LE('Subnet has unsupported gateway IP ending in 1: %s. '
'Any other gateway IP is supported.'), gateway)
return
# TODO(claudiub): what if there is no gateway?
self._create_cust_route(
vsid, '%s/0' % constants.IPV4_DEFAULT, gateway, rdid_uuid)
# customer route for metadata is also necessary.
self._create_cust_route(
vsid, '%s/32' % CONF.AGENT.neutron_metadata_address, gateway,
rdid_uuid)
def _create_cust_route(self, vsid, dest_prefix, next_hop, rdid_uuid):
self._create_new_object(
self._scimv2.MSFT_NetVirtualizationCustomerRouteSettingData,
VirtualSubnetID=vsid,
DestinationPrefix=dest_prefix,
NextHop=next_hop,
Metric=255,
RoutingDomainID='{%s}' % rdid_uuid)
def create_lookup_record(self, provider_addr, customer_addr, mac, vsid):
# check for existing entry.
lrec = self._scimv2.MSFT_NetVirtualizationLookupRecordSettingData(
CustomerAddress=customer_addr, VirtualSubnetID=vsid)
if (lrec and lrec[0].VirtualSubnetID == vsid and
lrec[0].ProviderAddress == provider_addr and
lrec[0].MACAddress == mac):
# lookup record already exists, nothing to do.
return
# create new lookup record.
if lrec:
lrec[0].Delete_()
if constants.IPV4_DEFAULT == customer_addr:
# customer address used for DHCP requests.
record_type = self._LOOKUP_RECORD_TYPE_L2_ONLY
else:
record_type = self._LOOKUP_RECORD_TYPE_STATIC
self._create_new_object(
self._scimv2.MSFT_NetVirtualizationLookupRecordSettingData,
VirtualSubnetID=vsid,
Rule=self._TRANSLATE_ENCAP,
Type=record_type,
MACAddress=mac,
CustomerAddress=customer_addr,
ProviderAddress=provider_addr)
def _create_new_object(self, object_class, **args):
new_obj = object_class.new(**args)
new_obj.Put_()
return new_obj
def _get_network_ifaces_by_name(self, network_name):
return [n for n in self._scimv2.MSFT_NetAdapter() if
n.Name.find(network_name) >= 0]
def _get_network_iface_index(self, network_name):
if self._net_if_indexes.get(network_name):
return self._net_if_indexes[network_name]
description = (
self._utils.get_vswitch_external_network_name(network_name))
# physical NIC and vswitch must have the same MAC address.
networks = self._scimv2.MSFT_NetAdapter(
InterfaceDescription=description)
if networks:
self._net_if_indexes[network_name] = networks[0].InterfaceIndex
return networks[0].InterfaceIndex
def get_network_iface_ip(self, network_name):
networks = [n for n in self._get_network_ifaces_by_name(network_name)
if n.DriverDescription == self._HYPERV_VIRT_ADAPTER]
if networks:
ip_addr = self._scimv2.MSFT_NetIPAddress(
InterfaceIndex=networks[0].InterfaceIndex,
AddressFamily=self._IPV4_ADDRESS_FAMILY)
if ip_addr:
return (ip_addr[0].IPAddress, ip_addr[0].PrefixLength)
else:
LOG.error(_LE('No IP Address could be found for network: %s'),
network_name)
else:
LOG.error(_LE('No vswitch was found with name: %s'), network_name)
return (None, None)

View File

@@ -145,10 +145,28 @@ class HyperVUtilsV2(utils.HyperVUtils):
vswitch_name)
return vswitch[0]
def _get_vswitch_external_port(self, vswitch_name):
vswitch = self._get_vswitch(vswitch_name)
ext_ports = self._conn.Msvm_ExternalEthernetPort()
for ext_port in ext_ports:
lan_endpoint_list = ext_port.associators(
wmi_result_class=self._LAN_ENDPOINT)
if lan_endpoint_list:
lan_endpoint_list = lan_endpoint_list[0].associators(
wmi_result_class=self._LAN_ENDPOINT)
if (lan_endpoint_list and
lan_endpoint_list[0].SystemName == vswitch.Name):
return ext_port
def set_switch_external_port_trunk_vlan(self, vswitch_name, vlan_id,
desired_endpoint_mode):
pass
def get_vswitch_external_network_name(self, vswitch_name):
ext_port = self._get_vswitch_external_port(vswitch_name)
if ext_port:
return ext_port.ElementName
def set_vswitch_port_vlan_id(self, vlan_id, switch_port_name):
port_alloc, found = self._get_switch_port_allocation(switch_port_name)
if not found:
@@ -176,15 +194,45 @@ class HyperVUtilsV2(utils.HyperVUtils):
port_alloc.path_(), [vlan_settings.GetText_(1)])
self._check_job_status(ret_val, job_path)
def set_vswitch_port_vsid(self, vsid, switch_port_name):
port_alloc, found = self._get_switch_port_allocation(switch_port_name)
if not found:
raise utils.HyperVException(
msg=_('Port Allocation not found: %s') % switch_port_name)
vsid_settings = self._get_security_setting_data_from_port_alloc(
port_alloc)
if vsid_settings:
if vsid_settings.VirtualSubnetId == vsid:
# VSID already added, no need to readd it.
return
# Removing the feature because it cannot be modified
# due to a wmi exception.
self._remove_virt_feature(vsid_settings)
(vsid_settings, found) = self._get_security_setting_data(
switch_port_name)
vsid_settings.VirtualSubnetId = vsid
self._add_virt_feature(port_alloc, vsid_settings)
def _get_vlan_setting_data_from_port_alloc(self, port_alloc):
return self._get_first_item(port_alloc.associators(
wmi_result_class=self._PORT_VLAN_SET_DATA))
def _get_security_setting_data_from_port_alloc(self, port_alloc):
return self._get_first_item(port_alloc.associators(
wmi_result_class=self._PORT_SECURITY_SET_DATA))
def _get_vlan_setting_data(self, switch_port_name, create=True):
return self._get_setting_data(
self._PORT_VLAN_SET_DATA,
switch_port_name, create)
def _get_security_setting_data(self, switch_port_name, create=True):
return self._get_setting_data(
self._PORT_SECURITY_SET_DATA, switch_port_name, create)
def _get_switch_port_allocation(self, switch_port_name, create=False):
return self._get_setting_data(
self._PORT_ALLOC_SET_DATA,

View File

@@ -0,0 +1,65 @@
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for Hyper-V Agent Notifier.
"""
import mock
from hyperv.neutron import constants
from hyperv.neutron import hyperv_agent_notifier
from hyperv.tests import base
class TestAgentNotifierApi(base.BaseTestCase):
def setUp(self):
super(TestAgentNotifierApi, self).setUp()
self.notifier = hyperv_agent_notifier.AgentNotifierApi(
topic=constants.AGENT_TOPIC, client=mock.MagicMock())
def test_tunnel_update(self):
expected_topic = hyperv_agent_notifier.get_topic_name(
constants.AGENT_TOPIC, constants.TUNNEL, constants.UPDATE)
self.notifier.tunnel_update(mock.sentinel.context,
mock.sentinel.tunnel_ip,
constants.TYPE_NVGRE)
self.notifier._client.prepare.assert_called_once_with(
topic=expected_topic, fanout=True)
prepared_client = self.notifier._client.prepare.return_value
prepared_client.cast.assert_called_once_with(
mock.sentinel.context, 'tunnel_update',
tunnel_ip=mock.sentinel.tunnel_ip,
tunnel_type=constants.TYPE_NVGRE)
def test_lookup_update(self):
expected_topic = hyperv_agent_notifier.get_topic_name(
constants.AGENT_TOPIC, constants.LOOKUP, constants.UPDATE)
self.notifier.lookup_update(mock.sentinel.context,
mock.sentinel.lookup_ip,
mock.sentinel.lookup_details)
self.notifier._client.prepare.assert_called_once_with(
topic=expected_topic, fanout=True)
prepared_client = self.notifier._client.prepare.return_value
prepared_client.cast.assert_called_once_with(
mock.sentinel.context, 'lookup_update',
lookup_ip=mock.sentinel.lookup_ip,
lookup_details=mock.sentinel.lookup_details)

View File

@@ -21,6 +21,7 @@ Unit tests for Windows Hyper-V virtual switch neutron driver
import mock
from hyperv.neutron import constants
from hyperv.neutron import hyperv_agent_notifier
from hyperv.neutron import hyperv_neutron_agent
from hyperv.neutron import utils
from hyperv.neutron import utilsfactory
@@ -42,7 +43,12 @@ class TestHyperVNeutronAgent(base.BaseTestCase):
self.agent._utils = mock.MagicMock()
self.agent.sec_groups_agent = mock.MagicMock()
self.agent.context = mock.Mock()
self.agent.client = mock.MagicMock()
self.agent.connection = mock.MagicMock()
self.agent.agent_id = mock.Mock()
self.agent.notifier = mock.Mock()
self.agent._utils = mock.MagicMock()
self.agent._nvgre_ops = mock.MagicMock()
def test_load_physical_network_mappings(self):
test_mappings = ['fake_network_1:fake_vswitch',
@@ -56,6 +62,54 @@ class TestHyperVNeutronAgent(base.BaseTestCase):
self.assertEqual(expected,
self.agent._physical_network_mappings.items())
@mock.patch.object(hyperv_neutron_agent.nvgre_ops, 'HyperVNvgreOps')
def test_init_nvgre_disabled(self, mock_hyperv_nvgre_ops):
self.agent._init_nvgre()
self.assertFalse(mock_hyperv_nvgre_ops.called)
self.assertFalse(self.agent._nvgre_enabled)
@mock.patch.object(hyperv_neutron_agent.nvgre_ops, 'HyperVNvgreOps')
def test_init_nvgre_no_tunnel_ip(self, mock_hyperv_nvgre_ops):
self.config(enable_support=True, group='NVGRE')
self.assertRaises(utils.HyperVException, self.agent._init_nvgre)
@mock.patch.object(hyperv_neutron_agent.nvgre_ops, 'HyperVNvgreOps')
def test_init_nvgre_enabled(self, mock_hyperv_nvgre_ops):
self.config(enable_support=True, group='NVGRE')
self.config(provider_tunnel_ip=mock.sentinel.tunneling_ip,
group='NVGRE')
self.agent._init_nvgre()
mock_hyperv_nvgre_ops.assert_called_once_with(
self.agent._physical_network_mappings.values())
self.assertTrue(self.agent._nvgre_enabled)
self.agent._nvgre_ops.init_notifier.assert_called_once_with(
self.agent.context, self.agent.client)
expected_topic = hyperv_agent_notifier.get_topic_name(
self.agent.topic, constants.LOOKUP, constants.UPDATE)
self.agent.connection.create_consumer.assert_called_once_with(
expected_topic, self.agent.endpoints, fanout=True)
self.agent.connection.servers[-1].start.assert_called_once_with()
def test_get_agent_configurations(self):
actual = self.agent.get_agent_configurations()
self.assertEqual(self.agent._physical_network_mappings,
actual['vswitch_mappings'])
self.assertNotIn('tunnel_types', actual)
self.assertNotIn('tunneling_ip', actual)
def test_get_agent_configurations_nvgre(self):
self.config(enable_support=True, group='NVGRE')
self.config(provider_tunnel_ip=mock.sentinel.tunneling_ip,
group='NVGRE')
actual = self.agent.get_agent_configurations()
self.assertEqual(self.agent._physical_network_mappings,
actual['vswitch_mappings'])
self.assertEqual([constants.TYPE_NVGRE], actual['tunnel_types'])
self.assertEqual(mock.sentinel.tunneling_ip, actual['tunneling_ip'])
def test_get_network_vswitch_map_by_port_id(self):
net_uuid = 'net-uuid'
self.agent._network_vswitch_map = {
@@ -78,6 +132,14 @@ class TestHyperVNeutronAgent(base.BaseTestCase):
self.assertIsNone(network)
self.assertIsNone(port_map)
def test_lookup_update(self):
kwargs = {'lookup_ip': mock.sentinel.lookup_ip,
'lookup_details': mock.sentinel.lookup_details}
self.agent.lookup_update(mock.sentinel.context, **kwargs)
self.agent._nvgre_ops.lookup_update.assert_called_once_with(kwargs)
@mock.patch.object(hyperv_neutron_agent.HyperVNeutronAgentMixin,
"_get_vswitch_name")
def test_provision_network_exception(self, mock_get_vswitch_name):
@@ -108,6 +170,25 @@ class TestHyperVNeutronAgent(base.BaseTestCase):
mock.sentinel.FAKE_SEGMENTATION_ID,
constants.TRUNK_ENDPOINT_MODE)
@mock.patch.object(hyperv_neutron_agent.HyperVNeutronAgentMixin,
"_get_vswitch_name")
def test_provision_network_nvgre(self, mock_get_vswitch_name):
self.agent._nvgre_enabled = True
vswitch_name = mock_get_vswitch_name.return_value
self.agent._provision_network(mock.sentinel.FAKE_PORT_ID,
mock.sentinel.FAKE_NET_UUID,
constants.TYPE_NVGRE,
mock.sentinel.FAKE_PHYSICAL_NETWORK,
mock.sentinel.FAKE_SEGMENTATION_ID)
mock_get_vswitch_name.assert_called_once_with(
constants.TYPE_NVGRE,
mock.sentinel.FAKE_PHYSICAL_NETWORK)
self.agent._nvgre_ops.bind_nvgre_network.assert_called_once_with(
mock.sentinel.FAKE_SEGMENTATION_ID,
mock.sentinel.FAKE_NET_UUID,
vswitch_name)
@mock.patch.object(hyperv_neutron_agent.HyperVNeutronAgentMixin,
"_get_vswitch_name")
def test_provision_network_flat(self, mock_get_vswitch_name):
@@ -155,6 +236,34 @@ class TestHyperVNeutronAgent(base.BaseTestCase):
self.assertEqual(enable_metrics, mock_enable_metrics.called)
@mock.patch.object(hyperv_neutron_agent.HyperVNeutronAgentMixin,
'_provision_network')
def test_port_bound_nvgre(self, mock_provision_network):
self.agent._nvgre_enabled = True
network_type = constants.TYPE_NVGRE
net_uuid = 'my-net-uuid'
fake_map = {'vswitch_name': mock.sentinel.vswitch_name,
'ports': []}
def fake_prov_network(*args, **kwargs):
self.agent._network_vswitch_map[net_uuid] = fake_map
mock_provision_network.side_effect = fake_prov_network
self.agent._port_bound(mock.sentinel.port_id, net_uuid, network_type,
mock.sentinel.physical_network,
mock.sentinel.segmentation_id)
self.assertIn(mock.sentinel.port_id, fake_map['ports'])
mock_provision_network.assert_called_once_with(
mock.sentinel.port_id, net_uuid, network_type,
mock.sentinel.physical_network, mock.sentinel.segmentation_id)
self.agent._utils.connect_vnic_to_vswitch.assert_called_once_with(
mock.sentinel.vswitch_name, mock.sentinel.port_id)
self.agent._nvgre_ops.bind_nvgre_port.assert_called_once_with(
mock.sentinel.segmentation_id, mock.sentinel.vswitch_name,
mock.sentinel.port_id)
@mock.patch.object(hyperv_neutron_agent.HyperVNeutronAgentMixin,
'_get_network_vswitch_map_by_port_id')
def _check_port_unbound(self, mock_get_vswitch_map_by_port_id, ports=None,

View File

@@ -0,0 +1,45 @@
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for the Hyper-V Mechanism Driver.
"""
from hyperv.neutron import constants
from hyperv.neutron.ml2 import mech_hyperv
from hyperv.tests import base
class TestHypervMechanismDriver(base.BaseTestCase):
def setUp(self):
super(TestHypervMechanismDriver, self).setUp()
self.mech_hyperv = mech_hyperv.HypervMechanismDriver()
def test_get_allowed_network_types(self):
agent = {'configurations': {'tunnel_types': []}}
actual_net_types = self.mech_hyperv.get_allowed_network_types(agent)
network_types = [constants.TYPE_LOCAL, constants.TYPE_FLAT,
constants.TYPE_VLAN]
self.assertEqual(network_types, actual_net_types)
def test_get_allowed_network_types_nvgre(self):
agent = {'configurations': {'tunnel_types': [constants.TYPE_NVGRE]}}
actual_net_types = self.mech_hyperv.get_allowed_network_types(agent)
network_types = [constants.TYPE_LOCAL, constants.TYPE_FLAT,
constants.TYPE_VLAN, constants.TYPE_NVGRE]
self.assertEqual(network_types, actual_net_types)

View File

@@ -0,0 +1,133 @@
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for the neutron client.
"""
import mock
from hyperv.neutron import constants
from hyperv.neutron import neutron_client
from hyperv.tests import base
class TestNeutronClient(base.BaseTestCase):
_FAKE_CIDR = '10.0.0.0/24'
_FAKE_GATEWAY = '10.0.0.1'
_FAKE_HOST = 'fake_host'
def setUp(self):
super(TestNeutronClient, self).setUp()
self._neutron = neutron_client.NeutronAPIClient()
self._neutron._client = mock.MagicMock()
def test_get_network_subnets(self):
self._neutron._client.show_network.return_value = {
'network': {
'subnets': [mock.sentinel.fake_subnet]
}
}
subnets = self._neutron.get_network_subnets(mock.sentinel.net_id)
self._neutron._client.show_network.assert_called_once_with(
mock.sentinel.net_id)
self.assertEqual([mock.sentinel.fake_subnet], subnets)
def test_get_network_subnets_exception(self):
self._neutron._client.show_network.side_effect = Exception("Fail")
subnets = self._neutron.get_network_subnets(mock.sentinel.net_id)
self.assertEqual([], subnets)
def test_get_network_subnet_cidr(self):
self._neutron._client.show_subnet.return_value = {
'subnet': {
'cidr': self._FAKE_CIDR,
'gateway_ip': self._FAKE_GATEWAY,
}
}
cidr, gw = self._neutron.get_network_subnet_cidr_and_gateway(
mock.sentinel.subnet_id)
self._neutron._client.show_subnet.assert_called_once_with(
mock.sentinel.subnet_id)
self.assertEqual(self._FAKE_CIDR, cidr)
self.assertEqual(self._FAKE_GATEWAY, gw)
def test_get_network_subnet_cidr_exception(self):
self._neutron._client.show_subnet.side_effect = Exception("Fail")
cidr, gw = self._neutron.get_network_subnet_cidr_and_gateway(
mock.sentinel.subnet_id)
self.assertIsNone(cidr)
self.assertIsNone(gw)
def test_get_port_ip_address(self):
self._neutron._client.show_port.return_value = {
'port': {
'fixed_ips': [{'ip_address': mock.sentinel.ip_addr}]
}
}
ip_addr = self._neutron.get_port_ip_address(mock.sentinel.fake_port_id)
self._neutron._client.show_port.assert_called_once_with(
mock.sentinel.fake_port_id)
self.assertEqual(mock.sentinel.ip_addr, ip_addr)
def test_get_port_ip_address_exception(self):
self._neutron._client.show_port.side_effect = Exception("Fail")
ip_addr = self._neutron.get_port_ip_address(mock.sentinel.fake_port_id)
self.assertIsNone(ip_addr)
def test_get_tunneling_agents(self):
non_tunnel_agent = {}
ignored_agent = {'configurations': {
'tunnel_types': [constants.TYPE_NVGRE]}
}
tunneling_agent = {
'configurations': {'tunnel_types': [constants.TYPE_NVGRE],
'tunneling_ip': mock.sentinel.tunneling_ip},
'host': self._FAKE_HOST
}
self._neutron._client.list_agents.return_value = {
'agents': [non_tunnel_agent, ignored_agent, tunneling_agent]
}
actual = self._neutron.get_tunneling_agents()
self.assertEqual({self._FAKE_HOST: mock.sentinel.tunneling_ip}, actual)
def test_get_tunneling_agents_exception(self):
self._neutron._client.list_agents.side_effect = Exception("Fail")
actual = self._neutron.get_tunneling_agents()
self.assertEqual({}, actual)
def test_get_network_ports(self):
self._neutron._client.list_ports.return_value = {
'ports': [mock.sentinel.port]
}
actual = self._neutron.get_network_ports(key='value')
self._neutron._client.list_ports.assert_called_once_with(key='value')
self.assertEqual([mock.sentinel.port], actual)
def test_get_network_ports_exception(self):
self._neutron._client.list_ports.side_effect = Exception("Fail")
actual = self._neutron.get_network_ports()
self.assertEqual([], actual)

View File

@@ -0,0 +1,183 @@
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for Windows Hyper-V NVGRE driver.
"""
import mock
from hyperv.neutron import constants
from hyperv.neutron import nvgre_ops
from hyperv.neutron import utilsfactory
from hyperv.tests import base
class TestHyperVNvgreOps(base.BaseTestCase):
FAKE_MAC_ADDR = 'fa:ke:ma:ca:dd:re:ss'
def setUp(self):
super(TestHyperVNvgreOps, self).setUp()
utilsfactory._get_windows_version = mock.MagicMock(
return_value='6.2.0')
self.context = 'context'
self.ops = nvgre_ops.HyperVNvgreOps([])
self.ops._vswitch_ips[mock.sentinel.network_name] = (
mock.sentinel.ip_addr)
self.ops.context = self.context
self.ops._notifier = mock.MagicMock()
self.ops._hyperv_utils = mock.MagicMock()
self.ops._nvgre_utils = mock.MagicMock()
self.ops._n_client = mock.MagicMock()
self.ops._db = mock.MagicMock()
def test_refresh_tunneling_agents(self):
self.ops._n_client.get_tunneling_agents.return_value = {
mock.sentinel.host: mock.sentinel.host_ip
}
self.ops._refresh_tunneling_agents()
self.assertEqual(mock.sentinel.host_ip,
self.ops._tunneling_agents[mock.sentinel.host])
@mock.patch.object(nvgre_ops.HyperVNvgreOps, '_register_lookup_record')
def test_lookup_update(self, mock_register_record):
args = {'lookup_ip': mock.sentinel.lookup_ip,
'lookup_details': {
'customer_addr': mock.sentinel.customer_addr,
'mac_addr': mock.sentinel.mac_addr,
'customer_vsid': mock.sentinel.vsid}
}
self.ops.lookup_update(args)
mock_register_record.assert_called_once_with(
mock.sentinel.lookup_ip,
mock.sentinel.customer_addr,
mock.sentinel.mac_addr,
mock.sentinel.vsid)
@mock.patch.object(nvgre_ops.HyperVNvgreOps, '_register_lookup_record')
def test_lookup_update_no_details(self, mock_register_record):
self.ops.lookup_update({})
self.assertFalse(mock_register_record.called)
def test_register_lookup_record(self):
self.ops._register_lookup_record(
mock.sentinel.provider_addr, mock.sentinel.customer_addr,
mock.sentinel.mac_addr, mock.sentinel.vsid)
self.ops._nvgre_utils.create_lookup_record.assert_called_once_with(
mock.sentinel.provider_addr, mock.sentinel.customer_addr,
mock.sentinel.mac_addr, mock.sentinel.vsid)
@mock.patch.object(nvgre_ops.HyperVNvgreOps, '_register_lookup_record')
def test_bind_nvgre_port(self, mock_register_record):
self.ops._nvgre_utils.get_network_iface_ip.return_value = (
mock.sentinel.provider_addr, mock.sentinel.prefix_len)
mac_addr = self.ops._hyperv_utils.get_vnic_mac_address.return_value
customer_addr = self.ops._n_client.get_port_ip_address.return_value
self.ops.bind_nvgre_port(mock.sentinel.vsid,
mock.sentinel.network_name,
mock.sentinel.port_id)
self.ops._hyperv_utils.set_vswitch_port_vsid.assert_called_once_with(
mock.sentinel.vsid, mock.sentinel.port_id)
mock_register_record.assert_has_calls([
mock.call(mock.sentinel.provider_addr, customer_addr, mac_addr,
mock.sentinel.vsid),
mock.call(mock.sentinel.ip_addr, constants.IPV4_DEFAULT, mac_addr,
mock.sentinel.vsid)])
self.ops._notifier.lookup_update.assert_called_once_with(
self.context, mock.sentinel.provider_addr, {
'customer_addr': customer_addr,
'mac_addr': mac_addr,
'customer_vsid': mock.sentinel.vsid
})
def test_bind_nvgre_port_no_provider_addr(self):
self.ops._nvgre_utils.get_network_iface_ip = mock.MagicMock(
return_value=(None, None))
self.ops.bind_nvgre_port(mock.sentinel.vsid,
mock.sentinel.network_name,
mock.sentinel.port_id)
self.assertFalse(self.ops._hyperv_utils.set_vswitch_port_vsid.called)
@mock.patch.object(nvgre_ops.HyperVNvgreOps, 'refresh_nvgre_records')
def test_bind_nvgre_network(self, mock_refresh_records):
self.config(provider_tunnel_ip=mock.sentinel.ip_addr, group='NVGRE')
self.ops._n_client.get_network_subnets.return_value = [
mock.sentinel.subnet, mock.sentinel.subnet2]
get_cidr = self.ops._n_client.get_network_subnet_cidr_and_gateway
get_cidr.return_value = (mock.sentinel.cidr, mock.sentinel.gateway)
self.ops.bind_nvgre_network(
mock.sentinel.vsid, mock.sentinel.net_uuid,
mock.sentinel.vswitch_name)
self.assertEqual(mock.sentinel.vsid,
self.ops._network_vsids[mock.sentinel.net_uuid])
self.ops._n_client.get_network_subnets.assert_called_once_with(
mock.sentinel.net_uuid)
get_cidr.assert_called_once_with(mock.sentinel.subnet)
self.ops._nvgre_utils.create_customer_routes.assert_called_once_with(
mock.sentinel.vsid, mock.sentinel.vswitch_name,
mock.sentinel.cidr, mock.sentinel.gateway)
mock_refresh_records.assert_called_once_with(
network_id=mock.sentinel.net_uuid)
self.ops._notifier.tunnel_update.assert_called_once_with(
self.context, mock.sentinel.ip_addr, mock.sentinel.vsid)
@mock.patch.object(nvgre_ops.HyperVNvgreOps, '_register_lookup_record')
def test_refresh_nvgre_records(self, mock_register_record):
self.ops._nvgre_ports.append(mock.sentinel.processed_port_id)
self.ops._tunneling_agents[mock.sentinel.host_id] = (
mock.sentinel.agent_ip)
self.ops._network_vsids[mock.sentinel.net_id] = (
mock.sentinel.vsid)
processed_port = {'id': mock.sentinel.processed_port_id}
no_host_port = {'id': mock.sentinel.port_no_host_id,
'binding:host_id': mock.sentinel.odd_host_id}
other_net_id_port = {'id': mock.sentinel.port_other_net_id,
'binding:host_id': mock.sentinel.host_id,
'network_id': mock.sentinel.odd_net_id}
port = {'id': mock.sentinel.port_id,
'binding:host_id': mock.sentinel.host_id,
'network_id': mock.sentinel.net_id,
'mac_address': self.FAKE_MAC_ADDR,
'fixed_ips': [{'ip_address': mock.sentinel.customer_addr}]
}
self.ops._n_client.get_network_ports.return_value = [
processed_port, no_host_port, other_net_id_port, port]
self.ops.refresh_nvgre_records()
expected_mac = self.FAKE_MAC_ADDR.replace(':', '')
mock_register_record.assert_has_calls([
mock.call(mock.sentinel.agent_ip, mock.sentinel.customer_addr,
expected_mac, mock.sentinel.vsid),
# mock.call(mock.sentinel.agent_ip, constants.METADATA_ADDR,
# expected_mac, mock.sentinel.vsid)
])
self.assertIn(mock.sentinel.port_id, self.ops._nvgre_ports)

View File

@@ -34,6 +34,15 @@ class HyperVUtilsTestCase(base.BaseTestCase):
self.utils = utils.HyperVUtils()
self.utils._wmi_conn = mock.MagicMock()
@mock.patch.object(utils.HyperVUtils, '_get_vnic_settings')
def test_get_vnic_mac_address(self, mock_get_vnic_settings):
mock_vnic = mock.MagicMock(Address=mock.sentinel.mac_address)
mock_get_vnic_settings.return_value = mock_vnic
actual_mac_address = self.utils.get_vnic_mac_address(
mock.sentinel.switch_port_name)
self.assertEqual(mock.sentinel.mac_address, actual_mac_address)
@mock.patch.object(utils.HyperVUtils, "_get_switch_port_path_by_name")
def test_disconnect_switch_port_not_found(self, mock_get_swp_path):
mock_svc = self.utils._conn.Msvm_VirtualSwitchManagementService()[0]

View File

@@ -0,0 +1,278 @@
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for the Hyper-V NVGRE support.
"""
import mock
from oslo_config import cfg
from hyperv.neutron import constants
from hyperv.neutron import utils_nvgre
from hyperv.tests import base
CONF = cfg.CONF
class TestHyperVNvgreUtils(base.BaseTestCase):
_FAKE_RDID = 'fake_rdid'
_FAKE_NETWORK_NAME = 'fake_network_name'
_FAKE_VSID = 9001
_FAKE_DEST_PREFIX = 'fake_dest_prefix'
_FAKE_GW_BAD = '10.0.0.1'
_FAKE_GW = '10.0.0.2'
def setUp(self):
super(TestHyperVNvgreUtils, self).setUp()
self.utils = utils_nvgre.NvgreUtils()
self.utils._utils = mock.MagicMock()
self.utils._scimv2 = mock.MagicMock()
def _create_mock_binding(self):
binding = mock.MagicMock()
binding.BindName = self.utils._WNV_BIND_NAME
binding.Name = mock.sentinel.fake_network
net_binds = self.utils._scimv2.MSFT_NetAdapterBindingSettingData
net_binds.return_value = [binding]
return binding
@mock.patch.object(utils_nvgre.NvgreUtils, 'get_network_iface_ip')
@mock.patch.object(utils_nvgre.NvgreUtils, '_get_network_iface_index')
def test_create_provider_address(self, mock_get_iface_index,
mock_get_iface_ip):
mock_get_iface_index.return_value = mock.sentinel.iface_index
mock_get_iface_ip.return_value = (mock.sentinel.iface_ip,
mock.sentinel.prefix_len)
provider_addr = mock.MagicMock()
scimv2 = self.utils._scimv2
obj_class = scimv2.MSFT_NetVirtualizationProviderAddressSettingData
obj_class.return_value = [provider_addr]
self.utils.create_provider_address(mock.sentinel.fake_network,
mock.sentinel.fake_vlan_id)
self.assertTrue(provider_addr.Delete_.called)
obj_class.new.assert_called_once_with(
ProviderAddress=mock.sentinel.iface_ip,
VlanID=mock.sentinel.fake_vlan_id,
InterfaceIndex=mock.sentinel.iface_index,
PrefixLength=mock.sentinel.prefix_len)
@mock.patch.object(utils_nvgre.NvgreUtils, 'get_network_iface_ip')
@mock.patch.object(utils_nvgre.NvgreUtils, '_get_network_iface_index')
def test_create_provider_address_none(self, mock_get_iface_index,
mock_get_iface_ip):
mock_get_iface_ip.return_value = (None, None)
self.utils.create_provider_address(mock.sentinel.fake_network,
mock.sentinel.fake_vlan_id)
scimv2 = self.utils._scimv2
self.assertFalse(
scimv2.MSFT_NetVirtualizationProviderAddressSettingData.new.called)
@mock.patch.object(utils_nvgre.NvgreUtils, 'get_network_iface_ip')
@mock.patch.object(utils_nvgre.NvgreUtils, '_get_network_iface_index')
def test_create_provider_address_exists(self, mock_get_iface_index,
mock_get_iface_ip):
mock_get_iface_index.return_value = mock.sentinel.iface_index
mock_get_iface_ip.return_value = (mock.sentinel.iface_ip,
mock.sentinel.prefix_len)
provider_addr = mock.MagicMock(
VlanID=mock.sentinel.fake_vlan_id,
InterfaceIndex=mock.sentinel.iface_index)
scimv2 = self.utils._scimv2
obj_class = scimv2.MSFT_NetVirtualizationProviderAddressSettingData
obj_class.return_value = [provider_addr]
self.utils.create_provider_address(mock.sentinel.fake_network,
mock.sentinel.fake_vlan_id)
self.assertFalse(obj_class.new.called)
@mock.patch.object(utils_nvgre.NvgreUtils, '_get_network_iface_index')
def test_create_provider_route(self, mock_get_iface_index):
mock_get_iface_index.return_value = mock.sentinel.iface_index
self.utils._scimv2.MSFT_NetVirtualizationProviderRouteSettingData = (
mock.MagicMock(return_value=[]))
self.utils.create_provider_route(mock.sentinel.fake_network)
scimv2 = self.utils._scimv2
obj_class = scimv2.MSFT_NetVirtualizationProviderRouteSettingData
obj_class.new.assert_called_once_with(
InterfaceIndex=mock.sentinel.iface_index,
DestinationPrefix='%s/0' % constants.IPV4_DEFAULT,
NextHop=constants.IPV4_DEFAULT)
@mock.patch.object(utils_nvgre.NvgreUtils, '_get_network_iface_index')
def test_create_provider_route_none(self, mock_get_iface_index):
mock_get_iface_index.return_value = None
self.utils.create_provider_route(mock.sentinel.fake_network)
scimv2 = self.utils._scimv2
self.assertFalse(
scimv2.MSFT_NetVirtualizationProviderRouteSettingData.new.called)
@mock.patch.object(utils_nvgre.NvgreUtils, '_get_network_iface_index')
def test_create_provider_route_exists(self, mock_get_iface_index):
mock_get_iface_index.return_value = mock.sentinel.iface_index
self.utils._scimv2.MSFT_NetVirtualizationProviderRouteSettingData = (
mock.MagicMock(return_value=[mock.MagicMock()]))
self.utils.create_provider_route(mock.sentinel.fake_network)
scimv2 = self.utils._scimv2
self.assertFalse(
scimv2.MSFT_NetVirtualizationProviderRouteSettingData.new.called)
@mock.patch.object(utils_nvgre.NvgreUtils, '_create_cust_route')
def _check_create_customer_routes(self, mock_create_route, gateway=None):
customer_route = mock.MagicMock()
scimv2 = self.utils._scimv2
obj_class = scimv2.MSFT_NetVirtualizationCustomerRouteSettingData
obj_class.return_value = [customer_route]
self.utils.create_customer_routes(
self._FAKE_VSID, self._FAKE_NETWORK_NAME, self._FAKE_DEST_PREFIX,
gateway)
routes = [(self._FAKE_DEST_PREFIX, constants.IPV4_DEFAULT)]
if gateway and gateway[-1] != '1':
routes.append(('%s/0' % constants.IPV4_DEFAULT, gateway))
routes.append(('%s/32' % CONF.AGENT.neutron_metadata_address,
gateway))
expected_calls = [
mock.call(self._FAKE_VSID, dest_prefix, next_hop, mock.ANY)
for dest_prefix, next_hop in routes]
self.assertTrue(customer_route.Delete_.called)
mock_create_route.assert_has_calls(expected_calls)
def test_create_customer_route(self):
self._check_create_customer_routes(gateway=self._FAKE_GW)
def test_create_customer_route_no_gateway(self):
self._check_create_customer_routes()
def test_create_customer_route_bad_gateway(self):
self._check_create_customer_routes(gateway=self._FAKE_GW_BAD)
def test_create_cust_route(self):
self.utils._create_cust_route(
mock.sentinel.fake_vsid, mock.sentinel.dest_prefix,
mock.sentinel.next_hop, self._FAKE_RDID)
scimv2 = self.utils._scimv2
obj_class = scimv2.MSFT_NetVirtualizationCustomerRouteSettingData
obj_class.new.assert_called_once_with(
VirtualSubnetID=mock.sentinel.fake_vsid,
DestinationPrefix=mock.sentinel.dest_prefix,
NextHop=mock.sentinel.next_hop,
Metric=255,
RoutingDomainID='{%s}' % self._FAKE_RDID)
def _check_create_lookup_record(self, customer_addr, expected_type):
lookup = mock.MagicMock()
scimv2 = self.utils._scimv2
obj_class = scimv2.MSFT_NetVirtualizationLookupRecordSettingData
obj_class.return_value = [lookup]
self.utils.create_lookup_record(mock.sentinel.provider_addr,
customer_addr,
mock.sentinel.mac_addr,
mock.sentinel.fake_vsid)
self.assertTrue(lookup.Delete_.called)
obj_class.new.assert_called_once_with(
VirtualSubnetID=mock.sentinel.fake_vsid,
Rule=self.utils._TRANSLATE_ENCAP,
Type=expected_type,
MACAddress=mock.sentinel.mac_addr,
CustomerAddress=customer_addr,
ProviderAddress=mock.sentinel.provider_addr)
def test_create_lookup_record_l2_only(self):
self._check_create_lookup_record(
constants.IPV4_DEFAULT,
self.utils._LOOKUP_RECORD_TYPE_L2_ONLY)
def test_create_lookup_record_static(self):
self._check_create_lookup_record(
mock.sentinel.customer_addr,
self.utils._LOOKUP_RECORD_TYPE_STATIC)
def test_create_lookup_record_exists(self):
lookup = mock.MagicMock(VirtualSubnetID=mock.sentinel.fake_vsid,
ProviderAddress=mock.sentinel.provider_addr,
CustomerAddress=mock.sentinel.customer_addr,
MACAddress=mock.sentinel.mac_addr)
scimv2 = self.utils._scimv2
obj_class = scimv2.MSFT_NetVirtualizationLookupRecordSettingData
obj_class.return_value = [lookup]
self.utils.create_lookup_record(mock.sentinel.provider_addr,
mock.sentinel.customer_addr,
mock.sentinel.mac_addr,
mock.sentinel.fake_vsid)
self.assertFalse(obj_class.new.called)
def test_get_network_iface_index(self):
fake_network = mock.MagicMock(InterfaceIndex=mock.sentinel.iface_index)
self.utils._scimv2.MSFT_NetAdapter.return_value = [fake_network]
description = (
self.utils._utils.get_vswitch_external_network_name.return_value)
index = self.utils._get_network_iface_index(mock.sentinel.fake_network)
self.assertEqual(mock.sentinel.iface_index, index)
self.assertIn(mock.sentinel.fake_network, self.utils._net_if_indexes)
self.utils._scimv2.MSFT_NetAdapter.assert_called_once_with(
InterfaceDescription=description)
def test_get_network_iface_index_cached(self):
self.utils._net_if_indexes[mock.sentinel.fake_network] = (
mock.sentinel.iface_index)
index = self.utils._get_network_iface_index(mock.sentinel.fake_network)
self.assertEqual(mock.sentinel.iface_index, index)
self.assertFalse(self.utils._scimv2.MSFT_NetAdapter.called)
@mock.patch.object(utils_nvgre.NvgreUtils, '_get_network_ifaces_by_name')
def test_get_network_iface_ip(self, mock_get_net_ifaces):
fake_network = mock.MagicMock(
InterfaceIndex=mock.sentinel.iface_index,
DriverDescription=self.utils._HYPERV_VIRT_ADAPTER)
mock_get_net_ifaces.return_value = [fake_network]
fake_netip = mock.MagicMock(IPAddress=mock.sentinel.provider_addr,
PrefixLength=mock.sentinel.prefix_len)
self.utils._scimv2.MSFT_NetIPAddress.return_value = [fake_netip]
pair = self.utils.get_network_iface_ip(mock.sentinel.fake_network)
self.assertEqual(
(mock.sentinel.provider_addr, mock.sentinel.prefix_len), pair)
@mock.patch.object(utils_nvgre.NvgreUtils, '_get_network_ifaces_by_name')
def test_get_network_iface_ip_none(self, mock_get_net_ifaces):
mock_get_net_ifaces.return_value = []
pair = self.utils.get_network_iface_ip(mock.sentinel.fake_network)
self.assertEqual((None, None), pair)

View File

@@ -190,6 +190,29 @@ class TestHyperVUtilsV2(base.BaseTestCase):
self.assertRaises(utils.HyperVException, self._utils._get_vswitch,
self._FAKE_VSWITCH_NAME)
def test_get_vswitch_external_port(self):
vswitch = mock.MagicMock(Name=mock.sentinel.vswitch_name)
self._utils._conn.Msvm_VirtualEthernetSwitch.return_value = [vswitch]
ext_port = mock.MagicMock()
lan_endpoint1 = mock.MagicMock()
ext_port.associators.return_value = [lan_endpoint1]
lan_endpoint2 = mock.MagicMock(SystemName=mock.sentinel.vswitch_name)
lan_endpoint1.associators.return_value = [lan_endpoint2]
self._utils._conn.Msvm_ExternalEthernetPort.return_value = [ext_port]
result = self._utils._get_vswitch_external_port(mock.sentinel.name)
self.assertEqual(ext_port, result)
@mock.patch.object(utilsv2.HyperVUtilsV2, '_get_vswitch_external_port')
def test_get_vswitch_external_network_name(self, mock_get_vswitch_port):
mock_get_vswitch_port.return_value.ElementName = (
mock.sentinel.network_name)
result = self._utils.get_vswitch_external_network_name(
mock.sentinel.vswitch_name)
self.assertEqual(mock.sentinel.network_name, result)
def test_set_vswitch_port_vlan_id(self):
self._mock_get_switch_port_alloc(found=True)
self._utils._get_vlan_setting_data_from_port_alloc = mock.MagicMock()
@@ -226,6 +249,36 @@ class TestHyperVUtilsV2(base.BaseTestCase):
self.assertFalse(mock_svc.RemoveFeatureSettings.called)
self.assertFalse(mock_svc.AddFeatureSettings.called)
@mock.patch.object(utilsv2.HyperVUtilsV2, '_add_virt_feature')
@mock.patch.object(utilsv2.HyperVUtilsV2, '_remove_virt_feature')
@mock.patch.object(utilsv2.HyperVUtilsV2, '_get_security_setting_data')
def test_set_vswitch_port_vsid(self, mock_get_security_sd, mock_rm_feat,
mock_add_feat):
mock_port_alloc = self._mock_get_switch_port_alloc()
mock_vsid_settings = mock.MagicMock()
mock_port_alloc.associators.return_value = [mock_vsid_settings]
mock_get_security_sd.return_value = (mock_vsid_settings, True)
self._utils.set_vswitch_port_vsid(mock.sentinel.vsid,
mock.sentinel.switch_port_name)
mock_rm_feat.assert_called_once_with(mock_vsid_settings)
mock_add_feat.assert_called_once_with(mock_port_alloc,
mock_vsid_settings)
@mock.patch.object(utilsv2.HyperVUtilsV2, '_add_virt_feature')
def test_set_vswitch_port_vsid_already_set(self, mock_add_feat):
mock_port_alloc = self._mock_get_switch_port_alloc()
mock_vsid_settings = mock.MagicMock(VirtualSubnetId=mock.sentinel.vsid)
mock_port_alloc.associators.return_value = (mock_vsid_settings, True)
self._utils.set_vswitch_port_vsid(mock.sentinel.vsid,
mock.sentinel.switch_port_name)
self.assertFalse(mock_add_feat.called)
def test_get_setting_data(self):
self._utils._get_first_item = mock.MagicMock(return_value=None)

View File

@@ -10,3 +10,5 @@ oslo.i18n>=1.5.0 # Apache-2.0
oslo.log>=1.2.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0
oslo.utils>=1.4.0 # Apache-2.0
python-neutronclient