Initial stab at OnMetal backend driver.

JIRA:NCP-1769
This commit is contained in:
Brad Morgan
2015-11-09 11:25:35 -06:00
parent de378ce04c
commit c73011274a
19 changed files with 1899 additions and 55 deletions

View File

@@ -49,7 +49,11 @@ EXTENDED_ATTRIBUTES_2_0 = {
"is_visible": True},
"network_plugin": {"allow_post": True, "allow_put": False,
"enforce_policy": True,
"is_visible": False, "default": ''}}}
"is_visible": False, "default": ''},
"instance_node_id": {"allow_post": True, "allow_put": False,
"default": '', "is_visible": False},
}
}
class Ports_quark(extensions.ExtensionDescriptor):

View File

@@ -641,8 +641,8 @@ def network_delete(context, network):
context.session.delete(network)
def subnet_find_ordered_by_most_full(context, net_id, lock_subnets=True,
**filters):
def _subnet_find_ordered_by_used_ips(context, net_id, lock_subnets=True,
order="asc", unused=False, **filters):
count = sql_func.count(models.IPAddress.address).label("count")
size = (models.Subnet.last_ip - models.Subnet.first_ip)
query = context.session.query(models.Subnet, count)
@@ -651,9 +651,16 @@ def subnet_find_ordered_by_most_full(context, net_id, lock_subnets=True,
query = query.filter_by(do_not_use=False)
query = query.outerjoin(models.Subnet.generated_ips)
query = query.group_by(models.Subnet.id)
query = query.order_by(
asc(models.Subnet.ip_version),
asc(size - count))
query = query.order_by(asc(models.Subnet.ip_version))
if unused: # find unused subnets
query = query.having(count == 0)
else: # otherwise, order used subnets
if order == "desc":
query = query.order_by(desc(size - count))
else: # default acending
query = query.order_by(asc(size - count))
query = query.filter(models.Subnet.network_id == net_id)
if "ip_version" in filters:
@@ -667,6 +674,24 @@ def subnet_find_ordered_by_most_full(context, net_id, lock_subnets=True,
return query
def subnet_find_ordered_by_most_full(context, net_id, lock_subnets=True,
**filters):
return _subnet_find_ordered_by_used_ips(
context, net_id, lock_subnets=lock_subnets, order="asc", **filters)
def subnet_find_ordered_by_least_full(context, net_id, lock_subnets=True,
**filters):
return _subnet_find_ordered_by_used_ips(
context, net_id, lock_subnets=lock_subnets, order="desc", **filters)
def subnet_find_unused(context, net_id, lock_subnets=True,
**filters):
return _subnet_find_ordered_by_used_ips(
context, net_id, lock_subnets=lock_subnets, unused=True, **filters)
def subnet_update_next_auto_assign_ip(context, subnet):
query = context.session.query(models.Subnet)
query = query.filter(models.Subnet.id == subnet["id"])

View File

@@ -36,6 +36,12 @@ class BaseDriver(object):
def get_connection(self):
LOG.info("get_connection")
def select_ipam_strategy(self, network_id, network_strategy, **kwargs):
LOG.info("Selecting IPAM strategy for network_id:%s "
"network_strategy:%s" % (network_id, network_strategy))
LOG.info("Selected IPAM strategy: %s" % (network_strategy))
return network_strategy
def create_network(self, context, network_name, tags=None,
network_id=None, **kwargs):
LOG.info("create_network %s %s %s" % (context, network_name,

View File

@@ -0,0 +1,542 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Ironic agent driver for Quark.
"""
import netaddr
from neutron.common import exceptions
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from quark.drivers import base
from quark import utils
import json
from quark import network_strategy
STRATEGY = network_strategy.STRATEGY
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
IRONIC_IPAM_STRATEGIES = {
"provider": {
"default": "IRONIC_ANY",
"overrides": {
"BOTH_REQUIRED": "IRONIC_BOTH_REQUIRED",
"BOTH": "IRONIC_BOTH",
"ANY": "IRONIC_ANY"
}
},
"tenant": {
"overrides": {}
}
}
ironic_opts = [
# client connection info
cfg.StrOpt("ironic_client", default="neutronclient.v2_0.client.Client",
help=("Class of the client used to communicate with the "
"Ironic agent")),
cfg.StrOpt('endpoint_url',
default='http://127.0.0.1:9697',
help='URL for connecting to neutron'),
cfg.IntOpt('timeout',
default=30,
help='Timeout value for connecting to neutron in seconds.'),
cfg.BoolOpt('insecure',
default=False,
help='If set, ignore any SSL validation issues'),
cfg.StrOpt('ca_cert',
help=('Location of CA certificates file to use for '
'neutron client requests.')),
cfg.StrOpt('password',
help='Password for connecting to neutron.', secret=True),
cfg.StrOpt('tenant_name',
help='Tenant name for connecting to neutron.'),
cfg.StrOpt('tenant_id',
help='Tenant id for connecting to neutron.'),
cfg.StrOpt('auth_url',
default='http://localhost:5000/v2.0',
help='Authorization URL for connecting to neutron.'),
cfg.StrOpt('auth_strategy',
default='keystone',
help='Authorization strategy for connecting to neutron.'),
# client retry options
cfg.IntOpt("operation_retries",
default=3,
help="Number of times to attempt client operations."),
cfg.IntOpt("operation_delay",
default=0,
help="Base seconds to wait between client attempts."),
cfg.IntOpt("operation_backoff",
default=0,
help="Base seconds for client exponential backoff"),
# conf options
cfg.StrOpt("ironic_ipam_strategies",
default=json.dumps(IRONIC_IPAM_STRATEGIES),
help="Default IPAM strategies and overrides for this driver")
]
CONF.register_opts(ironic_opts, "IRONIC")
class FakeIronicClient(object):
def __init__(self, *args, **kwargs):
pass
def create_port(self, *args, **kwargs):
return {"port": {"vlan_id": 500, "id": "fake_uuid"}}
def delete_port(self, *args, **kwargs):
return
class IronicException(exceptions.NeutronException):
message = "ironic driver error: %(msg)s"
class IronicDriver(base.BaseDriver):
def __init__(self):
self._client = None
self._ipam_strategies = None
super(IronicDriver, self).__init__()
@classmethod
def get_name(cls):
return "IRONIC"
def load_config(self):
LOG.info("Loading Ironic settings.")
self._client_cls = importutils.import_class(CONF.IRONIC.ironic_client)
self._client = self._get_client()
self._ipam_strategies = self._parse_ironic_ipam_strategies()
LOG.info("Ironic Driver config loaded. Client: %s"
% (CONF.IRONIC.endpoint_url))
def _get_client(self):
params = {
'endpoint_url': CONF.IRONIC.endpoint_url,
'timeout': CONF.IRONIC.timeout,
'insecure': CONF.IRONIC.insecure,
'ca_cert': CONF.IRONIC.ca_cert,
'auth_strategy': CONF.IRONIC.auth_strategy,
'tenant_name': CONF.IRONIC.tenant_name,
'tenant_id': CONF.IRONIC.tenant_id,
'password': CONF.IRONIC.password
}
return self._client_cls(**params)
def _parse_ironic_ipam_strategies(self):
strategies = json.loads(CONF.IRONIC.ironic_ipam_strategies)
for net_type in ['provider', 'tenant']:
strategy = strategies.get(net_type, {})
default = strategy.get("default")
# provider nets must specify a default IPAM strategy
if net_type == 'provider':
if not default:
raise Exception("ironic_ipam_strategies must have a "
"default 'provider' strategy.")
return strategies
def select_ipam_strategy(self, network_id, network_strategy, **kwargs):
"""Return relevant IPAM strategy name.
:param network_id: neutron network id.
:param network_strategy: default strategy for the network.
NOTE(morgabra) This feels like a hack but I can't think of a better
idea. The root problem is we can now attach ports to networks with
a different backend driver/ipam strategy than the network speficies.
We handle the the backend driver part with allowing network_plugin to
be specified for port objects. This works pretty well because nova or
whatever knows when we are hooking up an Ironic node so it can pass
along that key during port_create().
IPAM is a little trickier, especially in Ironic's case, because we
*must* use a specific IPAM for provider networks. There isn't really
much of an option other than involve the backend driver when selecting
the IPAM strategy.
"""
LOG.info("Selecting IPAM strategy for network_id:%s "
"network_strategy:%s" % (network_id, network_strategy))
net_type = "tenant"
if STRATEGY.is_provider_network(network_id):
net_type = "provider"
strategy = self._ipam_strategies.get(net_type, {})
default = strategy.get("default")
overrides = strategy.get("overrides", {})
# If we override a particular strategy explicitly, we use it.
if network_strategy in overrides:
LOG.info("Selected overridden IPAM strategy: %s"
% (overrides[network_strategy]))
return overrides[network_strategy]
# Otherwise, we are free to use an explicit default.
if default:
LOG.info("Selected default IPAM strategy for tenant "
"network: %s" % (default))
return default
# Fallback to the network-specified IPAM strategy
LOG.info("Selected network strategy for tenant "
"network: %s" % (network_strategy))
return network_strategy
def _make_subnet_dict(self, subnet):
dns_nameservers = [str(netaddr.IPAddress(dns["ip"]))
for dns in subnet.get("dns_nameservers")]
host_routes = [{"destination": r["cidr"],
"nexthop": r["gateway"]} for r in subnet["routes"]]
res = {
"id": subnet.get("id"),
"name": subnet.get("name"),
"tenant_id": subnet.get("tenant_id"),
"dns_nameservers": dns_nameservers,
"host_routes": host_routes,
"cidr": subnet.get("cidr"),
"gateway_ip": subnet.get("gateway_ip")
}
return res
def _make_fixed_ip_dict(self, context, address):
return {"subnet": self._make_subnet_dict(address["subnet"]),
"ip_address": address["address_readable"]}
@utils.retry_loop(CONF.IRONIC.operation_retries,
delay=CONF.IRONIC.operation_delay,
backoff=CONF.IRONIC.operation_backoff)
def _create_port(self, context, body):
try:
return self._client.create_port(body={"port": body})
except Exception as e:
msg = "failed to create downstream port. Exception: %s" % (str(e))
LOG.exception(msg)
raise
def _get_base_network_info(self, context, network_id, base_net_driver):
"""Return a dict of extra network information.
:param context: neutron request context.
:param network_id: neturon network id.
:param net_driver: network driver associated with network_id.
:raises IronicException: Any unexpected data fetching failures will
be logged and IronicException raised.
This driver can attach to networks managed by other drivers. We may
need some information from these drivers, or otherwise inform
downstream about the type of network we are attaching to. We can
make these decisions here.
"""
driver_name = base_net_driver.get_name()
net_info = {"network_type": driver_name}
LOG.debug('_get_base_network_info: %s %s'
% (driver_name, network_id))
# If the driver is NVP, we need to look up the lswitch id we should
# be attaching to.
if driver_name == 'NVP':
LOG.debug('looking up lswitch ids for network %s'
% (network_id))
lswitch_ids = base_net_driver.get_lswitch_ids_for_network(
context, network_id)
if not lswitch_ids or len(lswitch_ids) > 1:
msg = ('lswitch id lookup failed, %s ids found.'
% (len(lswitch_ids)))
LOG.error(msg)
raise IronicException(msg)
lswitch_id = lswitch_ids.pop()
LOG.info('found lswitch for network %s: %s'
% (network_id, lswitch_id))
net_info['lswitch_id'] = lswitch_id
LOG.debug('_get_base_network_info finished: %s %s %s'
% (driver_name, network_id, net_info))
return net_info
def create_port(self, context, network_id, port_id, **kwargs):
"""Create a port.
:param context: neutron api request context.
:param network_id: neutron network id.
:param port_id: neutron port id.
:param kwargs:
required keys - device_id: neutron port device_id (instance_id)
instance_node_id: nova hypervisor host id
mac_address: neutron port mac address
base_net_driver: the base network driver
optional keys - addresses: list of allocated IPAddress models
security_groups: list of associated security groups
:raises IronicException: If the client is unable to create the
downstream port for any reason, the exception will be logged
and IronicException raised.
"""
LOG.info("create_port %s %s %s" % (context.tenant_id, network_id,
port_id))
# sanity check
if not kwargs.get('base_net_driver'):
raise IronicException(msg='base_net_driver required.')
base_net_driver = kwargs['base_net_driver']
if not kwargs.get('device_id'):
raise IronicException(msg='device_id required.')
device_id = kwargs['device_id']
if not kwargs.get('instance_node_id'):
raise IronicException(msg='instance_node_id required.')
instance_node_id = kwargs['instance_node_id']
if not kwargs.get('mac_address'):
raise IronicException(msg='mac_address is required.')
mac_address = str(netaddr.EUI(kwargs["mac_address"]["address"]))
mac_address = mac_address.replace('-', ':')
# TODO(morgabra): Change this when we enable security groups.
if kwargs.get('security_groups'):
msg = 'ironic driver does not support security group operations.'
raise IronicException(msg=msg)
# unroll the given address models into a fixed_ips list we can
# pass downstream
fixed_ips = []
addresses = kwargs.get('addresses')
if not isinstance(addresses, list):
addresses = [addresses]
for address in addresses:
fixed_ips.append(self._make_fixed_ip_dict(context, address))
body = {
"id": port_id,
"network_id": network_id,
"device_id": device_id,
"device_owner": kwargs.get('device_owner', ''),
"tenant_id": context.tenant_id or "quark",
"mac_address": mac_address,
"fixed_ips": fixed_ips,
"switch:hardware_id": instance_node_id,
"dynamic_network": not STRATEGY.is_provider_network(network_id)
}
net_info = self._get_base_network_info(
context, network_id, base_net_driver)
body.update(net_info)
try:
LOG.info("creating downstream port: %s" % (body))
port = self._create_port(context, body)
LOG.info("created downstream port: %s" % (port))
return {"uuid": port['port']['id'],
"vlan_id": port['port']['vlan_id']}
except Exception as e:
msg = "failed to create downstream port. Exception: %s" % (e)
raise IronicException(msg=msg)
def update_port(self, context, port_id, **kwargs):
"""Update a port.
:param context: neutron api request context.
:param port_id: neutron port id.
:param kwargs: optional kwargs.
:raises IronicException: If the client is unable to update the
downstream port for any reason, the exception will be logged
and IronicException raised.
TODO(morgabra) It does not really make sense in the context of Ironic
to allow updating ports. fixed_ips and mac_address are burned in the
configdrive on the host, and we otherwise cannot migrate a port between
instances. Eventually we will need to support security groups, but for
now it's a no-op on port data changes, and we need to rely on the
API/Nova to not allow updating data on active ports.
"""
LOG.info("update_port %s %s" % (context.tenant_id, port_id))
# TODO(morgabra): Change this when we enable security groups.
if kwargs.get("security_groups"):
msg = 'ironic driver does not support security group operations.'
raise IronicException(msg=msg)
return {"uuid": port_id}
@utils.retry_loop(CONF.IRONIC.operation_retries,
delay=CONF.IRONIC.operation_delay,
backoff=CONF.IRONIC.operation_backoff)
def _delete_port(self, context, port_id):
try:
return self._client.delete_port(port_id)
except Exception as e:
# This doesn't get wrapped by the client unfortunately.
if "404 not found" in str(e).lower():
LOG.error("port %s not found downstream. ignoring delete."
% (port_id))
return
msg = ("failed to delete downstream port. "
"exception: %s" % (e))
LOG.exception(msg)
raise
def delete_port(self, context, port_id, **kwargs):
"""Delete a port.
:param context: neutron api request context.
:param port_id: neutron port id.
:param kwargs: optional kwargs.
:raises IronicException: If the client is unable to delete the
downstream port for any reason, the exception will be logged
and IronicException raised.
"""
LOG.info("delete_port %s %s" % (context.tenant_id, port_id))
try:
self._delete_port(context, port_id)
LOG.info("deleted downstream port: %s" % (port_id))
except Exception:
LOG.error("failed deleting downstream port, it is now "
"orphaned! port_id: %s" % (port_id))
def diag_port(self, context, port_id, **kwargs):
"""Diagnose a port.
:param context: neutron api request context.
:param port_id: neutron port id.
:param kwargs: optional kwargs.
:raises IronicException: If the client is unable to fetch the
downstream port for any reason, the exception will be
logged and IronicException raised.
"""
LOG.info("diag_port %s" % port_id)
try:
port = self._client.show_port(port_id)
except Exception as e:
msg = "failed fetching downstream port: %s" % (str(e))
LOG.exception(msg)
raise IronicException(msg=msg)
return {"downstream_port": port}
def create_network(self, *args, **kwargs):
"""Create a network.
:raises NotImplementedError: This driver does not manage networks.
NOTE: This is a no-op in the base driver, but this raises here as to
explicitly disallow network operations in case of a misconfiguration.
"""
raise NotImplementedError('ironic driver does not support '
'network operations.')
def delete_network(self, *args, **kwargs):
"""Delete a network.
:raises NotImplementedError: This driver does not manage networks.
NOTE: This is a no-op in the base driver, but this raises here as to
explicitly disallow network operations in case of a misconfiguration.
"""
raise NotImplementedError('ironic driver does not support '
'network operations.')
def diag_network(self, *args, **kwargs):
"""Diagnose a network.
:raises NotImplementedError: This driver does not manage networks.
NOTE: This is a no-op in the base driver, but this raises here as to
explicitly disallow network operations in case of a misconfiguration.
"""
raise NotImplementedError('ironic driver does not support '
'network operations.')
def create_security_group(self, context, group_name, **group):
"""Create a security group.
:raises NotImplementedError: This driver does not implement security
groups.
NOTE: Security groups will be supported in the future, but for now
they are explicitly disallowed.
"""
raise NotImplementedError('ironic driver does not support '
'security group operations.')
def delete_security_group(self, context, group_id, **kwargs):
"""Delete a security group.
:raises NotImplementedError: This driver does not implement security
groups.
NOTE: Security groups will be supported in the future, but for now
they are explicitly disallowed.
"""
raise NotImplementedError('ironic driver does not support '
'security group operations.')
def update_security_group(self, context, group_id, **group):
"""Update a security group.
:raises NotImplementedError: This driver does not implement security
groups.
NOTE: Security groups will be supported in the future, but for now
they are explicitly disallowed.
"""
raise NotImplementedError('ironic driver does not support '
'security group operations.')
def create_security_group_rule(self, context, group_id, rule):
"""Create a security group rule.
:raises NotImplementedError: This driver does not implement security
groups.
NOTE: Security groups will be supported in the future, but for now
they are explicitly disallowed.
"""
raise NotImplementedError('ironic driver does not support '
'security group operations.')
def delete_security_group_rule(self, context, group_id, rule):
"""Delete a security group rule.
:raises NotImplementedError: This driver does not implement security
groups.
NOTE: Security groups will be supported in the future, but for now
they are explicitly disallowed.
"""
raise NotImplementedError('ironic driver does not support '
'security group operations.')

View File

@@ -630,6 +630,16 @@ class NVPDriver(base.BaseDriver):
LOG.exception("Unexpected return from NVP: %s" % res)
raise
def get_lswitch_ids_for_network(self, context, network_id):
"""Public interface for fetching lswitch ids for a given network.
NOTE(morgabra) This is here because calling private methods
from outside the class feels wrong, and we need to be able to
fetch lswitch ids for use in other drivers.
"""
lswitches = self._lswitches_for_network(context, network_id).results()
return [s['uuid'] for s in lswitches["results"]]
def _lswitches_for_network(self, context, network_id):
with self.get_connection() as connection:
query = connection.lswitch().query()

View File

@@ -226,6 +226,16 @@ class OptimizedNVPDriver(NVPDriver):
context.session.add(new_switch)
return new_switch
def get_lswitch_ids_for_network(self, context, network_id):
"""Public interface for fetching lswitch ids for a given network.
NOTE(morgabra) This is here because calling private methods
from outside the class feels wrong, and we need to be able to
fetch lswitch ids for use in other drivers.
"""
lswitches = self._lswitches_for_network(context, network_id)
return [s['nvp_id'] for s in lswitches]
def _lswitches_for_network(self, context, network_id):
switches = context.session.query(LSwitch).filter(
LSwitch.network_id == network_id).all()

View File

@@ -14,6 +14,7 @@
# under the License.
from quark.drivers import base
from quark.drivers import ironic_driver as ironic
from quark.drivers import optimized_nvp_driver as optnvp
from quark.drivers.registry_base import DriverRegistryBase
from quark.drivers import unmanaged
@@ -30,7 +31,8 @@ class DriverRegistry(DriverRegistryBase):
self.drivers.update({
base.BaseDriver.get_name(): base.BaseDriver(),
optnvp.OptimizedNVPDriver.get_name(): optnvp.OptimizedNVPDriver(),
unmanaged.UnmanagedDriver.get_name(): unmanaged.UnmanagedDriver()})
unmanaged.UnmanagedDriver.get_name(): unmanaged.UnmanagedDriver(),
ironic.IronicDriver.get_name(): ironic.IronicDriver()})
# You may optionally specify a port-level driver name that will
# be used intead of the underlying network driver. This map determines
@@ -40,7 +42,13 @@ class DriverRegistry(DriverRegistryBase):
# specified to be used with networks that use "MY_OTHER_DRIVER",
# but *not* the inverse.
# Note that drivers are automatically compatible with themselves.
self.port_driver_compat_map = {}
self.port_driver_compat_map = {
ironic.IronicDriver.get_name(): [
base.BaseDriver.get_name(),
optnvp.OptimizedNVPDriver.get_name(),
unmanaged.UnmanagedDriver.get_name()
]
}
def get_driver(self, net_driver, port_driver=None):
LOG.info("Selecting driver for net_driver:%s "
@@ -54,7 +62,7 @@ class DriverRegistry(DriverRegistryBase):
# Net drivers are compatible with themselves
if port_driver == net_driver:
LOG.info("Selecting port_driver:%s" % (port_driver))
LOG.info("Selected port_driver:%s" % (port_driver))
return self.drivers[port_driver]
# Check port_driver is compatible with the given net_driver
@@ -64,11 +72,11 @@ class DriverRegistry(DriverRegistryBase):
"underlying network driver %s."
% (port_driver, net_driver))
LOG.info("Selecting port_driver:%s" % (port_driver))
LOG.info("Selected port_driver:%s" % (port_driver))
return self.drivers[port_driver]
elif net_driver in self.drivers:
LOG.info("Selecting net_driver:%s" % (net_driver))
LOG.info("Selected net_driver:%s" % (net_driver))
return self.drivers[net_driver]
raise Exception("Driver %s is not registered." % net_driver)

View File

@@ -15,7 +15,9 @@
from oslo_log import log as logging
from quark.drivers import base
from quark.drivers import security_groups as sg_driver
from quark import network_strategy
@@ -23,7 +25,7 @@ STRATEGY = network_strategy.STRATEGY
LOG = logging.getLogger(__name__)
class UnmanagedDriver(object):
class UnmanagedDriver(base.BaseDriver):
"""Unmanaged network driver.
Returns a bridge...

View File

@@ -995,12 +995,63 @@ class QuarkIpamBOTHREQ(QuarkIpamBOTH):
return subnets
class IronicIpam(QuarkIpam):
"""IPAM base class for the Ironic driver.
The idea here is that there are many small subnets created for a
particular segment for a provider network. And the Ironic IPAM
family selects unused ones, and only allows a single allocation
per subnet.
"""
def _select_subnet(self, context, net_id, ip_address, segment_id,
subnet_ids, **filters):
lock_subnets = True
select_api = db_api.subnet_find_unused
subnets = select_api(context, net_id, lock_subnets=lock_subnets,
segment_id=segment_id, scope=db_api.ALL,
subnet_id=subnet_ids, **filters)
if not subnets:
LOG.info("No subnets found given the search criteria!")
return
for subnet, ips_in_subnet in subnets:
# make sure we don't select subnets that have allocated ips.
if ips_in_subnet:
continue
yield subnet, ips_in_subnet
class IronicIpamANY(IronicIpam, QuarkIpamANY):
@classmethod
def get_name(self):
return "IRONIC_ANY"
class IronicIpamBOTH(IronicIpam, QuarkIpamBOTH):
@classmethod
def get_name(self):
return "IRONIC_BOTH"
class IronicIpamBOTHREQ(IronicIpam, QuarkIpamBOTHREQ):
@classmethod
def get_name(self):
return "IRONIC_BOTH_REQUIRED"
class IpamRegistry(object):
def __init__(self):
self.strategies = {
QuarkIpamANY.get_name(): QuarkIpamANY(),
QuarkIpamBOTH.get_name(): QuarkIpamBOTH(),
QuarkIpamBOTHREQ.get_name(): QuarkIpamBOTHREQ()}
QuarkIpamBOTHREQ.get_name(): QuarkIpamBOTHREQ(),
IronicIpamANY.get_name(): IronicIpamANY(),
IronicIpamBOTH.get_name(): IronicIpamBOTH(),
IronicIpamBOTHREQ.get_name(): IronicIpamBOTHREQ()
}
def is_valid_strategy(self, strategy_name):
if strategy_name in self.strategies:

View File

@@ -58,6 +58,28 @@ def _get_net_driver(network, port=None):
msg="invalid network_plugin: %s" % e)
def _get_ipam_driver(network, port=None):
network_id = network["id"]
network_strategy = network["ipam_strategy"]
# Ask the net driver for a IPAM strategy to use
# with the given network/default strategy.
net_driver = _get_net_driver(network, port=port)
strategy = net_driver.select_ipam_strategy(
network_id, network_strategy)
# If the driver has no opinion about which strategy to use,
# we use the one specified by the network.
if not strategy:
strategy = network_strategy
try:
return ipam.IPAM_REGISTRY.get_strategy(strategy)
except Exception as e:
raise exceptions.BadRequest(resource="ports",
msg="invalid ipam_strategy: %s" % e)
# NOTE(morgabra) Backend driver operations return a lot of stuff. We use a
# small subset of this data, so we filter out things we don't care about
# so we can avoid any collisions with real port data.
@@ -119,7 +141,8 @@ def create_port(context, port):
port_attrs = port["port"]
admin_only = ["mac_address", "device_owner", "bridge", "admin_state_up",
"use_forbidden_mac_range", "network_plugin"]
"use_forbidden_mac_range", "network_plugin",
"instance_node_id"]
utils.filter_body(context, port_attrs, admin_only=admin_only)
port_attrs = port["port"]
@@ -128,9 +151,17 @@ def create_port(context, port):
"use_forbidden_mac_range", False)
segment_id = utils.pop_param(port_attrs, "segment_id")
fixed_ips = utils.pop_param(port_attrs, "fixed_ips")
if "device_id" not in port_attrs:
port_attrs['device_id'] = ""
device_id = port_attrs['device_id']
# NOTE(morgabra) This should be instance.node from nova, only needed
# for ironic_driver.
if "instance_node_id" not in port_attrs:
port_attrs['instance_node_id'] = ""
instance_node_id = port_attrs['instance_node_id']
net_id = port_attrs["network_id"]
port_id = uuidutils.generate_uuid()
@@ -171,9 +202,18 @@ def create_port(context, port):
if not segment_id:
raise q_exc.AmbiguousNetworkId(net_id=net_id)
ipam_driver = ipam.IPAM_REGISTRY.get_strategy(net["ipam_strategy"])
network_plugin = utils.pop_param(port_attrs, "network_plugin")
if not network_plugin:
network_plugin = net["network_plugin"]
port_attrs["network_plugin"] = network_plugin
ipam_driver = _get_ipam_driver(net, port=port_attrs)
net_driver = _get_net_driver(net, port=port_attrs)
# NOTE(morgabra) It's possible that we select a driver different than
# the one specified by the network. However, we still might need to use
# this for some operations, so we also fetch it and pass it along to
# the backend driver we are actually using.
base_net_driver = _get_net_driver(net)
# TODO(anyone): security groups are not currently supported on port create.
# Please see JIRA:NCP-801
@@ -242,10 +282,15 @@ def create_port(context, port):
@cmd_mgr.do
def _allocate_backend_port(mac, addresses, net, port_id):
backend_port = net_driver.create_port(context, net["id"],
backend_port = net_driver.create_port(
context, net["id"],
port_id=port_id,
security_groups=group_ids,
device_id=device_id)
device_id=device_id,
instance_node_id=instance_node_id,
mac_address=mac,
addresses=addresses,
base_net_driver=base_net_driver)
_filter_backend_port(backend_port)
return backend_port
@@ -253,7 +298,10 @@ def create_port(context, port):
def _allocate_back_port_undo(backend_port):
LOG.info("Rolling back backend port...")
try:
net_driver.delete_port(context, backend_port["uuid"])
backend_port_uuid = None
if backend_port:
backend_port_uuid = backend_port.get("uuid")
net_driver.delete_port(context, backend_port_uuid)
except Exception:
LOG.exception(
"Couldn't rollback backend port %s" % backend_port)
@@ -413,6 +461,7 @@ def update_port(context, id, port):
# NOTE(morgabra) Updating network_plugin on port objects is explicitly
# disallowed in the api, so we use whatever exists in the db.
net_driver = _get_net_driver(port_db.network, port=port_db)
base_net_driver = _get_net_driver(port_db.network)
# TODO(anyone): What do we want to have happen here if this fails? Is it
# ok to continue to keep the IPs but fail to apply security
@@ -425,6 +474,7 @@ def update_port(context, id, port):
net_driver.update_port(context, port_id=port_db["backend_key"],
mac_address=port_db["mac_address"],
device_id=port_db["device_id"],
base_net_driver=base_net_driver,
**kwargs)
port_dict["security_groups"] = security_group_mods
@@ -545,15 +595,16 @@ def delete_port(context, id):
backend_key = port["backend_key"]
mac_address = netaddr.EUI(port["mac_address"]).value
ipam_driver = ipam.IPAM_REGISTRY.get_strategy(
port["network"]["ipam_strategy"])
ipam_driver = _get_ipam_driver(port["network"], port=port)
ipam_driver.deallocate_mac_address(context, mac_address)
ipam_driver.deallocate_ips_by_port(
context, port, ipam_reuse_after=CONF.QUARK.ipam_reuse_after)
net_driver = _get_net_driver(port.network, port=port)
net_driver = _get_net_driver(port["network"], port=port)
base_net_driver = _get_net_driver(port["network"])
net_driver.delete_port(context, backend_key, device_id=port["device_id"],
mac_address=port["mac_address"])
mac_address=port["mac_address"],
base_net_driver=base_net_driver)
with context.session.begin():
db_api.port_delete(context, port)

View File

@@ -44,6 +44,11 @@ quark_view_opts = [
default=True,
help=_('Controls whether or not to show ip_policy_id for'
'subnets')),
cfg.BoolOpt('show_provider_subnet_ids',
default=True,
help=_('Controls whether or not to show the provider subnet '
'id specified in the network strategy or use the '
'real id.')),
]
CONF.register_opts(quark_view_opts, "QUARK")
@@ -207,8 +212,12 @@ def _make_port_address_dict(ip, port, fields=None):
enabled = ip.enabled_for_port(port)
subnet_id = ip.get("subnet_id")
net_id = ip.get("network_id")
if STRATEGY.is_provider_network(net_id):
subnet_id = STRATEGY.get_provider_subnet_id(net_id, ip["version"])
subnet_id = ip.get("subnet_id")
show_provider_subnet_ids = CONF.QUARK.show_provider_subnet_ids
if STRATEGY.is_provider_network(net_id) and show_provider_subnet_ids:
subnet_id = STRATEGY.get_provider_subnet_id(
net_id, ip["version"])
ip_addr = {"subnet_id": subnet_id,
"ip_address": ip.formatted(),

View File

@@ -92,6 +92,13 @@ class QuarkFindSubnetAllocationCount(QuarkIpamBaseFunctionalTest):
for subnet in subnets:
self.assertIn(subnet[0]["cidr"], cidrs)
subnets = db_api.subnet_find_ordered_by_least_full(
self.context, net['id'], segment_id=None,
scope=db_api.ALL).all()
self.assertEqual(len(subnets), 3)
for subnet in subnets:
self.assertIn(subnet[0]["cidr"], cidrs)
def test_ordering_subnets_find_allocation_counts_when_counts_unequal(self):
models = []
cidrs = ["0.0.0.0/31", "1.1.1.0/31", "2.2.2.0/30"]
@@ -114,6 +121,17 @@ class QuarkFindSubnetAllocationCount(QuarkIpamBaseFunctionalTest):
self.assertEqual(subnets[2][0].cidr, "2.2.2.0/30")
self.assertEqual(subnets[2][1], 1)
subnets = db_api.subnet_find_ordered_by_least_full(
self.context, net['id'], segment_id=None,
scope=db_api.ALL).all()
self.assertEqual(len(subnets), 3)
self.assertEqual(subnets[0][0].cidr, "2.2.2.0/30")
self.assertEqual(subnets[0][1], 1)
self.assertIn(subnets[1][0].cidr, subnets_with_same_ips_used)
self.assertEqual(subnets[1][1], 0)
self.assertIn(subnets[2][0].cidr, subnets_with_same_ips_used)
self.assertEqual(subnets[2][1], 0)
def test_ordering_subnets_find_allocc_when_counts_unequal_size_equal(self):
models = []
cidrs = ["0.0.0.0/31", "1.1.1.0/31", "2.2.2.0/31"]
@@ -137,6 +155,17 @@ class QuarkFindSubnetAllocationCount(QuarkIpamBaseFunctionalTest):
self.assertEqual(subnets[2][0].cidr, "0.0.0.0/31")
self.assertEqual(subnets[2][1], 0)
subnets = db_api.subnet_find_ordered_by_least_full(
self.context, net['id'], segment_id=None,
scope=db_api.ALL).all()
self.assertEqual(len(subnets), 3)
self.assertEqual(subnets[0][0].cidr, "0.0.0.0/31")
self.assertEqual(subnets[0][1], 0)
self.assertEqual(subnets[1][0].cidr, "1.1.1.0/31")
self.assertEqual(subnets[1][1], 1)
self.assertEqual(subnets[2][0].cidr, "2.2.2.0/31")
self.assertEqual(subnets[2][1], 2)
def test_ordering_subnets_ip_version(self):
"""Order by ip_version primarily.
@@ -157,6 +186,37 @@ class QuarkFindSubnetAllocationCount(QuarkIpamBaseFunctionalTest):
self.assertEqual(subnets[0][0].ip_version, 4)
self.assertEqual(subnets[1][0].ip_version, 6)
subnets = db_api.subnet_find_ordered_by_least_full(
self.context, net['id'], segment_id=None,
scope=db_api.ALL).all()
self.assertEqual(subnets[0][0].ip_version, 4)
self.assertEqual(subnets[1][0].ip_version, 6)
def test_ordering_subnets_find_unused(self):
models = []
cidrsv4 = ["0.0.0.0/31", "1.1.1.0/31", "2.2.2.0/31"]
cidrsv6 = ["fffc::/127", "fffd::/127"]
for version, cidrs in [(4, cidrsv4), (6, cidrsv6)]:
for cidr in cidrs:
last = netaddr.IPNetwork(cidr).last
models.append(self._create_models(cidr, version, last))
with self._fixtures(models) as net:
self._create_ip_address("2.2.2.1", 4, "2.2.2.0/31", net["id"])
self._create_ip_address("2.2.2.2", 4, "2.2.2.0/31", net["id"])
self._create_ip_address("1.1.1.1", 4, "1.1.1.0/31", net["id"])
self._create_ip_address("fffc::1", 6, "fffc::/127", net["id"])
subnets = db_api.subnet_find_unused(
self.context, net['id'], segment_id=None,
scope=db_api.ALL).all()
self.assertEqual(len(subnets), 2)
self.assertEqual(subnets[0][0].cidr, "0.0.0.0/31")
self.assertEqual(subnets[0][1], 0)
self.assertEqual(subnets[1][0].cidr, "fffd::/127")
self.assertEqual(subnets[1][1], 0)
def test_subnet_set_full(self):
cidr4 = "0.0.0.0/30" # 2 bits
net4 = netaddr.IPNetwork(cidr4)

View File

@@ -23,9 +23,11 @@ from neutron.common import exceptions
from oslo_config import cfg
from quark.db import models
from quark.drivers import registry
from quark import exceptions as q_exc
from quark import network_strategy
from quark.plugin_modules import ports as quark_ports
from quark import plugin_views
from quark import tags
from quark.tests import test_quark_plugin
@@ -169,6 +171,129 @@ class TestQuarkGetPorts(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(result[key], expected[key])
class TestQuarkGetPortsProviderSubnetIds(test_quark_plugin.TestQuarkPlugin):
def setUp(self):
super(TestQuarkGetPortsProviderSubnetIds, self).setUp()
self.strategy = {
"1": {
"bridge": "publicnet",
"subnets": {
"4": "v4-provider-subnet-id",
"6": "v6-provider-subnet-id"
}
}
}
self.strategy_json = json.dumps(self.strategy)
self.old = plugin_views.STRATEGY
plugin_views.STRATEGY = network_strategy.JSONStrategy(
self.strategy_json)
cfg.CONF.set_override("default_net_strategy", self.strategy_json,
"QUARK")
def tearDown(self):
plugin_views.STRATEGY = self.old
def _port_associate_stub(self, ports, address, **kwargs):
if not isinstance(ports, list):
ports = [ports]
for port in ports:
assoc = models.PortIpAssociation()
assoc.port_id = port.id
assoc.ip_address_id = address.id
assoc.port = port
assoc.ip_address = address
assoc.enabled = address.address_type == "fixed"
return address
@contextlib.contextmanager
def _stubs(self, ports=None, addrs=None):
port_models = []
addr_models = None
if addrs:
addr_models = []
for address in addrs:
a = models.IPAddress(**address)
addr_models.append(a)
if isinstance(ports, list):
for port in ports:
port_model = models.Port(**port)
if addr_models:
port_model.ip_addresses = addr_models
for addr_model in addr_models:
self._port_associate_stub(
port_model, addr_model)
port_models.append(port_model)
elif ports is None:
port_models = None
else:
port_model = models.Port(**ports)
if addr_models:
port_model.ip_addresses = addr_models
for addr_model in addr_models:
self._port_associate_stub(
port_model, addr_model)
port_models = port_model
with contextlib.nested(
mock.patch("quark.db.api.port_find")
) as (port_find,):
port_find.return_value = port_models
yield
def test_port_show_with_provider_subnet_ids(self):
"""Prove provider subnets ids are shown on the port object."""
ip = dict(id=1, address=netaddr.IPAddress("192.168.1.100").value,
address_readable="192.168.1.100", subnet_id="1",
network_id="1", version=4, address_type="fixed")
port = dict(mac_address=int('AABBCCDDEEFF', 16), network_id="1",
tenant_id=self.context.tenant_id, device_id=2)
expected = {'status': "ACTIVE",
'device_owner': None,
'mac_address': 'AA:BB:CC:DD:EE:FF',
'network_id': "1",
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [
{'subnet_id': 'v4-provider-subnet-id', 'enabled': True,
'ip_address': '192.168.1.100'}
],
'device_id': 2}
with self._stubs(ports=port, addrs=[ip]):
result = self.plugin.get_port(self.context, 1)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_port_show_without_provider_subnet_ids(self):
"""Prove provider subnets ids are shown on the port object."""
cfg.CONF.set_override('show_provider_subnet_ids', False, 'QUARK')
self.addCleanup(
cfg.CONF.clear_override, 'show_provider_subnet_ids', 'QUARK')
ip = dict(id=1, address=netaddr.IPAddress("192.168.1.100").value,
address_readable="192.168.1.100", subnet_id="1",
network_id="1", version=4, address_type="fixed")
port = dict(mac_address=int('AABBCCDDEEFF', 16), network_id="1",
tenant_id=self.context.tenant_id, device_id=2)
expected = {'status': "ACTIVE",
'device_owner': None,
'mac_address': 'AA:BB:CC:DD:EE:FF',
'network_id': "1",
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [
{'subnet_id': '1', 'enabled': True,
'ip_address': '192.168.1.100'}
],
'device_id': 2}
with self._stubs(ports=port, addrs=[ip]):
result = self.plugin.get_port(self.context, 1)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
class TestQuarkGetPortsByIPAddress(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, ports=None, addr=None):
@@ -865,7 +990,8 @@ class TestQuarkUpdatePortSecurityGroups(test_quark_plugin.TestQuarkPlugin):
self.context, port_id=port_dict["backend_key"],
mac_address=port_dict["mac_address"],
device_id=port_dict["device_id"],
security_groups=[])
security_groups=[],
base_net_driver=registry.DRIVER_REGISTRY.get_driver('BASE'))
def test_update_port_no_security_groups(self):
port_dict = {"id": 1, "mac_address": "AA:BB:CC:DD:EE:FF",
@@ -879,7 +1005,8 @@ class TestQuarkUpdatePortSecurityGroups(test_quark_plugin.TestQuarkPlugin):
driver_port_update.assert_called_once_with(
self.context, port_id=port_dict["backend_key"],
mac_address=port_dict["mac_address"],
device_id=port_dict["device_id"])
device_id=port_dict["device_id"],
base_net_driver=registry.DRIVER_REGISTRY.get_driver('BASE'))
def test_update_port_security_groups_no_device_id_raises(self):
with self._stubs(
@@ -1040,7 +1167,8 @@ class TestQuarkDeletePort(test_quark_plugin.TestQuarkPlugin):
self.assertTrue(db_port_del.called)
driver_port_del.assert_called_with(
self.context, "foo", mac_address=port["port"]["mac_address"],
device_id=port["port"]["device_id"])
device_id=port["port"]["device_id"],
base_net_driver=registry.DRIVER_REGISTRY.get_driver("BASE"))
def test_port_delete_port_not_found_fails(self):
with self._stubs(port=None) as (db_port_del, driver_port_del):
@@ -1155,11 +1283,11 @@ class TestPortDiagnose(test_quark_plugin.TestQuarkPlugin):
self.plugin.diagnose_port(self.context, 1, [])
class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
class TestPortDriverSelection(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, network=None, addr=None, mac=None,
compat_map=None, driver_res=None):
network["ipam_strategy"] = "ANY"
compat_map=None, driver_res=None, ipam="FOO"):
network["ipam_strategy"] = "FOO"
# Response from the backend driver
self.expected_bridge = "backend-drivers-bridge"
@@ -1169,17 +1297,26 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
# Mock out the driver registry
foo_driver = mock.Mock()
foo_driver.create_port.return_value = driver_res
foo_driver.select_ipam_strategy.return_value = "FOO"
bar_driver = mock.Mock()
bar_driver.create_port.return_value = driver_res
bar_driver.select_ipam_strategy.return_value = "BAR"
drivers = {"FOO": foo_driver,
"BAR": bar_driver}
compat_map = compat_map or {}
# Mock out the IPAM registry
foo_ipam = mock.Mock()
foo_ipam.allocate_ip_address.return_value = addr
foo_ipam.allocate_mac_address.return_value = mac
bar_ipam = mock.Mock()
bar_ipam.allocate_ip_address.return_value = addr
bar_ipam.allocate_mac_address.return_value = mac
ipam = {"FOO": foo_ipam, "BAR": bar_ipam}
with contextlib.nested(
mock.patch("quark.db.api.port_create"),
mock.patch("quark.db.api.network_find"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
mock.patch("quark.ipam.QuarkIpam.allocate_mac_address"),
mock.patch("oslo_utils.uuidutils.generate_uuid"),
mock.patch("quark.plugin_views._make_port_dict"),
mock.patch("quark.db.api.port_count_all"),
@@ -1189,15 +1326,17 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
new_callable=mock.PropertyMock(return_value=drivers)),
mock.patch("quark.plugin_modules.ports.registry."
"DRIVER_REGISTRY.port_driver_compat_map",
new_callable=mock.PropertyMock(return_value=compat_map))
) as (port_create, net_find, alloc_ip, alloc_mac, gen_uuid, make_port,
port_count, limit_check, _, _):
new_callable=mock.PropertyMock(
return_value=compat_map)),
mock.patch("quark.plugin_modules.ports.ipam."
"IPAM_REGISTRY.strategies",
new_callable=mock.PropertyMock(return_value=ipam))
) as (port_create, net_find, gen_uuid, make_port,
port_count, limit_check, _, _, _):
net_find.return_value = network
alloc_ip.return_value = addr
alloc_mac.return_value = mac
gen_uuid.return_value = 1
port_count.return_value = 0
yield port_create, alloc_mac, net_find
yield (port_create, ipam, net_find)
def test_create_port_with_bad_network_plugin_fails(self):
network_dict = dict(id=1, tenant_id=self.context.tenant_id)
@@ -1215,7 +1354,7 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
port_models = port_model
with self._stubs(network=network, addr=ip,
mac=mac) as (port_create, alloc_mac, net_find):
mac=mac) as (port_create, ipam, net_find):
port_create.return_value = port_models
exc = "Driver FAIL is not registered."
@@ -1238,7 +1377,7 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
port_models = port_model
with self._stubs(network=network, addr=ip,
mac=mac) as (port_create, alloc_mac, net_find):
mac=mac) as (port_create, ipam, net_find):
port_create.return_value = port_models
exc = "Driver FAIL is not registered."
@@ -1262,7 +1401,7 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
port_models = port_model
with self._stubs(network=network, addr=ip,
mac=mac) as (port_create, alloc_mac, net_find):
mac=mac) as (port_create, ipam, net_find):
port_create.return_value = port_models
exc = ("Port driver BAR not allowed for underlying network "
@@ -1271,6 +1410,56 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
with self.assertRaisesRegexp(exceptions.BadRequest, exc):
self.plugin.create_port(admin_ctx, port)
def test_create_port_with_no_port_network_plugin(self):
network = dict(id=1, tenant_id=self.context.tenant_id,
network_plugin="FOO")
mac = dict(address="AA:BB:CC:DD:EE:FF")
port_name = "foobar"
ip = dict()
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name, device_owner="quark_tests",
bridge="quark_bridge", admin_state_up=False))
expected_mac = "DE:AD:BE:EF:00:00"
expected_bridge = "new_bridge"
expected_device_owner = "new_device_owner"
expected_admin_state = "new_state"
port_create_dict = {}
port_create_dict["port"] = port["port"].copy()
port_create_dict["port"]["mac_address"] = expected_mac
port_create_dict["port"]["device_owner"] = expected_device_owner
port_create_dict["port"]["bridge"] = expected_bridge
port_create_dict["port"]["admin_state_up"] = expected_admin_state
admin_ctx = self.context.elevated()
with self._stubs(network=network, addr=ip,
mac=mac) as (port_create, ipam, net_find):
self.plugin.create_port(admin_ctx, port_create_dict)
ipam["BAR"].allocate_mac_address.assert_not_called()
ipam["BAR"].allocate_ip_address.assert_not_called()
ipam["FOO"].allocate_ip_address.assert_called_once_with(
admin_ctx, [], network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
segment_id=None, mac_address=mac)
ipam["FOO"].allocate_mac_address.assert_called_once_with(
admin_ctx, network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
mac_address=expected_mac, use_forbidden_mac_range=False)
port_create.assert_called_once_with(
admin_ctx, bridge=self.expected_bridge, uuid=1, name="foobar",
admin_state_up=expected_admin_state, network_id=1,
tenant_id="fake", id=1, device_owner=expected_device_owner,
mac_address=mac["address"], device_id=2, backend_key=1,
security_groups=[], addresses=[], instance_node_id='',
network_plugin="FOO")
def test_create_port_with_port_network_plugin(self):
network = dict(id=1, tenant_id=self.context.tenant_id,
network_plugin="FOO")
@@ -1299,10 +1488,18 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
admin_ctx = self.context.elevated()
with self._stubs(network=network, addr=ip,
mac=mac) as (port_create, alloc_mac, net_find):
mac=mac) as (port_create, ipam, net_find):
self.plugin.create_port(admin_ctx, port_create_dict)
alloc_mac.assert_called_once_with(
ipam["BAR"].allocate_mac_address.assert_not_called()
ipam["BAR"].allocate_ip_address.assert_not_called()
ipam["FOO"].allocate_ip_address.assert_called_once_with(
admin_ctx, [], network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
segment_id=None, mac_address=mac)
ipam["FOO"].allocate_mac_address.assert_called_once_with(
admin_ctx, network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
mac_address=expected_mac, use_forbidden_mac_range=False)
@@ -1312,7 +1509,7 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
admin_state_up=expected_admin_state, network_id=1,
tenant_id="fake", id=1, device_owner=expected_device_owner,
mac_address=mac["address"], device_id=2, backend_key=1,
security_groups=[], addresses=[],
security_groups=[], addresses=[], instance_node_id='',
network_plugin=expected_network_plugin)
def test_create_port_with_compatible_port_network_plugin(self):
@@ -1344,11 +1541,19 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
compat_map = {"BAR": ["FOO"]}
admin_ctx = self.context.elevated()
with self._stubs(network=network, addr=ip, mac=mac,
compat_map=compat_map) as (port_create, alloc_mac,
compat_map=compat_map) as (port_create, ipam,
net_find):
self.plugin.create_port(admin_ctx, port_create_dict)
alloc_mac.assert_called_once_with(
ipam["FOO"].allocate_mac_address.assert_not_called()
ipam["FOO"].allocate_ip_address.assert_not_called()
ipam["BAR"].allocate_ip_address.assert_called_once_with(
admin_ctx, [], network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
segment_id=None, mac_address=mac)
ipam["BAR"].allocate_mac_address.assert_called_once_with(
admin_ctx, network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
mac_address=expected_mac, use_forbidden_mac_range=False)
@@ -1358,9 +1563,109 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
admin_state_up=expected_admin_state, network_id=1,
tenant_id="fake", id=1, device_owner=expected_device_owner,
mac_address=mac["address"], device_id=2, backend_key=1,
security_groups=[], addresses=[],
security_groups=[], addresses=[], instance_node_id='',
network_plugin=expected_network_plugin)
def test_create_port_ipam_selection(self):
network = dict(id=1, tenant_id=self.context.tenant_id,
network_plugin="FOO")
mac = dict(address="AA:BB:CC:DD:EE:FF")
port_name = "foobar"
ip = dict()
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name, device_owner="quark_tests",
bridge="quark_bridge", admin_state_up=False))
expected_mac = "DE:AD:BE:EF:00:00"
expected_bridge = "new_bridge"
expected_device_owner = "new_device_owner"
expected_admin_state = "new_state"
port_create_dict = {}
port_create_dict["port"] = port["port"].copy()
port_create_dict["port"]["mac_address"] = expected_mac
port_create_dict["port"]["device_owner"] = expected_device_owner
port_create_dict["port"]["bridge"] = expected_bridge
port_create_dict["port"]["admin_state_up"] = expected_admin_state
admin_ctx = self.context.elevated()
with self._stubs(network=network, addr=ip,
mac=mac) as (port_create, ipam, net_find):
self.plugin.create_port(admin_ctx, port_create_dict)
ipam["BAR"].allocate_mac_address.assert_not_called()
ipam["BAR"].allocate_ip_address.assert_not_called()
ipam["FOO"].allocate_ip_address.assert_called_once_with(
admin_ctx, [], network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
segment_id=None, mac_address=mac)
ipam["FOO"].allocate_mac_address.assert_called_once_with(
admin_ctx, network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
mac_address=expected_mac, use_forbidden_mac_range=False)
port_create.assert_called_once_with(
admin_ctx, bridge=self.expected_bridge, uuid=1, name="foobar",
admin_state_up=expected_admin_state, network_id=1,
tenant_id="fake", id=1, device_owner=expected_device_owner,
mac_address=mac["address"], device_id=2, backend_key=1,
security_groups=[], addresses=[], instance_node_id='',
network_plugin="FOO")
def test_create_port_ipam_selection_override_by_driver(self):
network = dict(id=1, tenant_id=self.context.tenant_id,
network_plugin="BAR")
mac = dict(address="AA:BB:CC:DD:EE:FF")
port_name = "foobar"
ip = dict()
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name, device_owner="quark_tests",
bridge="quark_bridge", admin_state_up=False))
expected_mac = "DE:AD:BE:EF:00:00"
expected_bridge = "new_bridge"
expected_device_owner = "new_device_owner"
expected_admin_state = "new_state"
port_create_dict = {}
port_create_dict["port"] = port["port"].copy()
port_create_dict["port"]["mac_address"] = expected_mac
port_create_dict["port"]["device_owner"] = expected_device_owner
port_create_dict["port"]["bridge"] = expected_bridge
port_create_dict["port"]["admin_state_up"] = expected_admin_state
admin_ctx = self.context.elevated()
with self._stubs(network=network, addr=ip,
mac=mac, ipam="BAR") as (port_create, ipam, net_find):
self.plugin.create_port(admin_ctx, port_create_dict)
ipam["FOO"].allocate_mac_address.assert_not_called()
ipam["FOO"].allocate_ip_address.assert_not_called()
ipam["BAR"].allocate_ip_address.assert_called_once_with(
admin_ctx, [], network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
segment_id=None, mac_address=mac)
ipam["BAR"].allocate_mac_address.assert_called_once_with(
admin_ctx, network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
mac_address=expected_mac, use_forbidden_mac_range=False)
port_create.assert_called_once_with(
admin_ctx, bridge=self.expected_bridge, uuid=1, name="foobar",
admin_state_up=expected_admin_state, network_id=1,
tenant_id="fake", id=1, device_owner=expected_device_owner,
mac_address=mac["address"], device_id=2, backend_key=1,
security_groups=[], addresses=[], instance_node_id='',
network_plugin="BAR")
def test_create_port_network_plugin_response_no_uuid_raises(self):
network_dict = dict(id=1, tenant_id=self.context.tenant_id)
port_name = "foobar"
@@ -1421,11 +1726,19 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
admin_ctx = self.context.elevated()
with self._stubs(network=network, addr=ip,
mac=mac, driver_res=driver_res) as (port_create,
alloc_mac,
ipam,
net_find):
self.plugin.create_port(admin_ctx, port_create_dict)
alloc_mac.assert_called_once_with(
ipam["BAR"].allocate_mac_address.assert_not_called()
ipam["BAR"].allocate_ip_address.assert_not_called()
ipam["FOO"].allocate_ip_address.assert_called_once_with(
admin_ctx, [], network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
segment_id=None, mac_address=mac)
ipam["FOO"].allocate_mac_address.assert_called_once_with(
admin_ctx, network["id"], 1,
cfg.CONF.QUARK.ipam_reuse_after,
mac_address=expected_mac, use_forbidden_mac_range=False)
@@ -1435,7 +1748,8 @@ class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
admin_state_up=expected_admin_state, network_id=1,
tenant_id="fake", id=1, device_owner=expected_device_owner,
mac_address=mac["address"], device_id=2, backend_key=5,
security_groups=[], addresses=[], vlan_id=50)
security_groups=[], addresses=[], vlan_id=50,
network_plugin=network["network_plugin"], instance_node_id='')
class TestQuarkPortCreateFiltering(test_quark_plugin.TestQuarkPlugin):
@@ -1491,7 +1805,8 @@ class TestQuarkPortCreateFiltering(test_quark_plugin.TestQuarkPlugin):
self.context, addresses=[], network_id=network["id"],
tenant_id="fake", uuid=1, name="foobar",
mac_address=alloc_mac()["address"], backend_key=1, id=1,
security_groups=[], device_id=2)
security_groups=[], network_plugin='BASE',
device_id=2, instance_node_id='')
def test_create_port_attribute_filtering_admin(self):
network = dict(id=1, tenant_id=self.context.tenant_id)
@@ -1534,7 +1849,7 @@ class TestQuarkPortCreateFiltering(test_quark_plugin.TestQuarkPlugin):
tenant_id="fake", id=1, device_owner=expected_device_owner,
mac_address=mac["address"], device_id=2, backend_key=1,
security_groups=[], addresses=[],
network_plugin=expected_network_plugin)
network_plugin=expected_network_plugin, instance_node_id='')
class TestQuarkPortUpdateFiltering(test_quark_plugin.TestQuarkPlugin):

View File

@@ -28,6 +28,10 @@ class TestBaseDriver(test_base.TestBase):
def test_get_connection(self):
self.driver.get_connection()
def test_select_ipam_strategy(self):
strategy = self.driver.select_ipam_strategy(1, "ANY")
self.assertEqual(strategy, "ANY")
def test_create_network(self):
self.driver.create_network(context=self.context, network_name="public")

View File

@@ -2185,3 +2185,86 @@ class QuarkIpamTestIpAddressFailure(test_base.TestBase):
net_id = "8f6555ca-fbe7-49db-8240-1cb84202c1f7"
with self.assertRaises(exceptions.IpAddressGenerationFailure):
raise quark.ipam.ip_address_failure(net_id)
class IronicIpamTestSelectSubnet(QuarkIpamBaseTest):
def setUp(self):
super(IronicIpamTestSelectSubnet, self).setUp()
self.ipam = quark.ipam.IronicIpamANY()
@contextlib.contextmanager
def _stubs(self, subnet, count, increments=True, marks_full=True):
with contextlib.nested(
mock.patch("quark.db.api.subnet_find_unused"),
mock.patch("quark.db.api.subnet_update_next_auto_assign_ip"),
mock.patch("quark.db.api.subnet_update_set_full"),
mock.patch("sqlalchemy.orm.session.Session.refresh"),
) as (subnet_find, subnet_incr, subnet_set_full, refresh):
sub_mods = []
sub_mods.append((subnet_helper(subnet), count))
def subnet_increment(context, sub):
if increments:
sub["next_auto_assign_ip"] += 1
return True
return False
def set_full_mock(context, sub):
if marks_full:
sub["next_auto_assign_ip"] = -1
return True
return False
subnet_find.return_value = sub_mods
subnet_incr.side_effect = subnet_increment
subnet_set_full.side_effect = set_full_mock
yield sub_mods, subnet_find, refresh
def test_select_subnet_returns_unused(self):
subnet = dict(id=1, first_ip=2, last_ip=2, next_auto_assign_ip=2,
cidr="0.0.0.0/30", ip_version=4, network_id=1,
ip_policy=dict(size=3, exclude=[
models.IPPolicyCIDR(cidr="0.0.0.4/32"),
models.IPPolicyCIDR(cidr="0.0.0.0/31")]))
with self._stubs(subnet, 0) as (subnets, subnet_find, refresh):
s = self.ipam.select_subnet(self.context, subnet["network_id"],
None, None)
self.assertEqual(s, subnets[0][0])
def test_select_subnet_does_not_return_used(self):
subnet = dict(id=1, first_ip=2, last_ip=2, next_auto_assign_ip=2,
cidr="0.0.0.0/30", ip_version=4, network_id=1,
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="0.0.0.4/32"),
models.IPPolicyCIDR(cidr="0.0.0.0/31")]))
with self._stubs(subnet, 1) as (subnets, subnet_find, refresh):
s = self.ipam.select_subnet(self.context, subnet["network_id"],
None, None)
self.assertEqual(s, None)
def test_select_subnet_v4_locks(self):
subnet = dict(id=1, first_ip=0, last_ip=18446744073709551615L,
cidr="::0/64", ip_version=6,
next_auto_assign_ip=1,
ip_policy=None, network_id=1)
with self._stubs(subnet, 0) as (subnets, subnet_find, refresh):
self.ipam.select_subnet(self.context, subnet["network_id"],
None, None, ip_version=6)
subnet_find.assert_called_with(self.context, 1, lock_subnets=True,
subnet_id=None, scope="all",
segment_id=None, ip_version=6)
def test_select_subnet_v6_locks(self):
subnet = dict(id=1, first_ip=0, last_ip=18446744073709551615L,
cidr="::0/64", ip_version=6,
next_auto_assign_ip=1,
ip_policy=None, network_id=1)
with self._stubs(subnet, 0) as (subnets, subnet_find, refresh):
self.ipam.select_subnet(self.context, subnet["network_id"],
None, None, ip_version=6)
subnet_find.assert_called_with(self.context, 1, lock_subnets=True,
subnet_id=None, scope="all",
segment_id=None, ip_version=6)

View File

@@ -0,0 +1,633 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import netaddr
import json
import mock
from oslo_config import cfg
import quark.drivers.ironic_driver
from quark import network_strategy
from quark.tests import test_base
class TestIronicDriverBase(test_base.TestBase):
def setUp(self):
super(TestIronicDriverBase, self).setUp()
net_strategy = {
"1": {
"bridge": "publicnet",
"subnets": {"4": "1", "6": "2"}
},
"2": {
"bridge": "publicnet",
"subnets": {"3": "1", "6": "4"}
}
}
strategy = json.dumps(net_strategy)
quark.drivers.ironic_driver.STRATEGY = network_strategy.JSONStrategy(
strategy)
cfg.CONF.set_override("default_net_strategy", strategy,
"QUARK")
cfg.CONF.set_override("operation_delay", 0,
"IRONIC")
cfg.CONF.set_override("operation_backoff", 0,
"IRONIC")
@contextlib.contextmanager
def _stubs(self, create_port=None, delete_port=None):
importer, client, create, delete = self._create_client(
create_port, delete_port)
driver = quark.drivers.ironic_driver.IronicDriver()
yield driver, client, create, delete
def _create_client(self, create_port=None, delete_port=None):
patcher = mock.patch('quark.drivers.ironic_driver.importutils')
importer_mock = patcher.start()
self.addCleanup(patcher.stop)
client_mock = mock.Mock()
importer_mock.import_class.return_value = client_mock
create_port_mock = client_mock.return_value.create_port
if not create_port:
create_port = [{"port": {"vlan_id": 500, "id": "portid"}}]
create_port_mock.side_effect = create_port
delete_port_mock = client_mock.return_value.delete_port
if delete_port is None:
delete_port = [None]
delete_port_mock.side_effect = delete_port
return importer_mock, client_mock, create_port_mock, delete_port_mock
def _create_address(self, cidr, default_route=True):
cidr = netaddr.IPNetwork(cidr)
gateway_ip = str(netaddr.IPAddress(cidr.first + 1))
host_ip = str(netaddr.IPAddress(cidr.first + 2))
dns = [{"ip": "8.8.8.8"}, {"ip": "8.8.4.4"}]
routes = [{"cidr": "earth", "gateway": gateway_ip},
{"cidr": "moon", "gateway": gateway_ip}]
address = {}
address["address_readable"] = host_ip
subnet = {
"id": "subnet_%s" % str(cidr),
"name": "subnet_name",
"tenant_id": "fake",
"dns_nameservers": dns,
"routes": routes if not default_route else [],
"gateway_ip": gateway_ip if default_route else None,
"cidr": str(cidr)
}
address["subnet"] = subnet
return address
def _create_base_net_driver(self, driver_type):
driver = mock.Mock()
driver.get_lswitch_ids_for_network.return_value = ["lswitch1"]
driver.get_name.return_value = driver_type
return driver
class TestIronicDriver(TestIronicDriverBase):
def test_import_client_class(self):
importer, _, _, _ = self._create_client()
quark.drivers.ironic_driver.IronicDriver()
importer.import_class.assert_called_once_with(
cfg.CONF.IRONIC.ironic_client)
def test_client_is_instantiated(self):
with self._stubs() as (driver, client, _, _):
params = {
'endpoint_url': cfg.CONF.IRONIC.endpoint_url,
'timeout': cfg.CONF.IRONIC.timeout,
'insecure': cfg.CONF.IRONIC.insecure,
'ca_cert': cfg.CONF.IRONIC.ca_cert,
'auth_strategy': cfg.CONF.IRONIC.auth_strategy,
'tenant_name': cfg.CONF.IRONIC.tenant_name,
'tenant_id': cfg.CONF.IRONIC.tenant_id,
'password': cfg.CONF.IRONIC.password
}
client.assert_called_once_with(**params)
class TestIronicDriverIPAMStrategies(TestIronicDriverBase):
def test_ipam_strategies(self):
with self._stubs() as (driver, client, _, _):
self.assertEqual(
driver._ipam_strategies,
json.loads(cfg.CONF.IRONIC.ironic_ipam_strategies))
def test_invalid_ipam_strategy_raises(self):
cfg.CONF.set_override('ironic_ipam_strategies',
'{}',
'IRONIC')
self.addCleanup(cfg.CONF.clear_override,
'ironic_ipam_strategies',
'IRONIC')
self.assertRaises(quark.drivers.ironic_driver.IronicDriver)
def test_ipam_strategy_provider_overrides(self):
with self._stubs() as (driver, _, _, _):
strategy = driver.select_ipam_strategy("1", "BOTH")
self.assertEqual(strategy, "IRONIC_BOTH")
def test_ipam_strategy_provider_returns_default(self):
with self._stubs() as (driver, _, _, _):
strategy = driver.select_ipam_strategy("1", "WTF")
self.assertEqual(strategy, "IRONIC_ANY")
def test_ipam_strategy_tenant_returns_network_strategy(self):
with self._stubs() as (driver, _, _, _):
strategy = driver.select_ipam_strategy("3", "WTF")
self.assertEqual(strategy, "WTF")
class TestIronicDriverCreatePort(TestIronicDriverBase):
def test_create_port(self):
create_response = {
"port": {
"id": "port1",
"vlan_id": 120
}
}
with self._stubs(create_port=[create_response]) as (driver, _,
create, _):
network_id = "net1"
port_id = "port1"
# mock of the default driver class for the network
base_net_driver = self._create_base_net_driver(
"UNMANAGED")
kwargs = {
"device_id": "device1",
"instance_node_id": "nodeid1",
"mac_address": {"address": 1},
"base_net_driver": base_net_driver,
"addresses": [],
"security_groups": [],
}
res = driver.create_port(
self.context, network_id, port_id,
**kwargs)
expected_call = {
'port': {
'switch:hardware_id': kwargs["instance_node_id"],
'device_owner': '',
'network_type': "UNMANAGED",
'mac_address': '00:00:00:00:00:01',
'network_id': network_id,
'tenant_id': self.context.tenant_id,
'dynamic_network': True,
'fixed_ips': [],
'id': port_id,
'device_id': kwargs["device_id"]
}
}
self.assertEqual(
res, {"uuid": create_response["port"]["id"],
"vlan_id": create_response["port"]["vlan_id"]})
create.assert_called_once_with(body=expected_call)
def test_create_port_includes_lswitch_ids(self):
create_response = {
"port": {
"id": "port1",
"vlan_id": 120
}
}
with self._stubs(create_port=[create_response]) as (driver, _,
create, _):
network_id = "net1"
port_id = "port1"
# mock of the default driver class for the network
base_net_driver = self._create_base_net_driver(
"NVP")
kwargs = {
"device_id": "device1",
"instance_node_id": "nodeid1",
"mac_address": {"address": 1},
"base_net_driver": base_net_driver,
"addresses": [],
"security_groups": [],
}
res = driver.create_port(
self.context, network_id, port_id,
**kwargs)
expected_call = {
'port': {
'switch:hardware_id': kwargs["instance_node_id"],
'device_owner': '',
'network_type': "NVP",
'mac_address': '00:00:00:00:00:01',
'network_id': network_id,
'tenant_id': self.context.tenant_id,
'lswitch_id': 'lswitch1',
'dynamic_network': True,
'fixed_ips': [],
'id': port_id,
'device_id': kwargs["device_id"]
}
}
self.assertEqual(
res, {"uuid": create_response["port"]["id"],
"vlan_id": create_response["port"]["vlan_id"]})
create.assert_called_once_with(body=expected_call)
def test_create_port_retries(self):
create_response = [
Exception("uhoh"),
{"port": {"id": "port1", "vlan_id": 120}}
]
with self._stubs(create_port=create_response) as (driver, _,
create, _):
network_id = "net1"
port_id = "port1"
# mock of the default driver class for the network
base_net_driver = self._create_base_net_driver(
"UNMANAGED")
kwargs = {
"device_id": "device1",
"instance_node_id": "nodeid1",
"mac_address": {"address": 1},
"base_net_driver": base_net_driver,
"addresses": [],
"security_groups": [],
}
res = driver.create_port(
self.context, network_id, port_id,
**kwargs)
expected_call = {
'port': {
'switch:hardware_id': kwargs["instance_node_id"],
'device_owner': '',
'network_type': "UNMANAGED",
'mac_address': '00:00:00:00:00:01',
'network_id': network_id,
'tenant_id': self.context.tenant_id,
'dynamic_network': True,
'fixed_ips': [],
'id': port_id,
'device_id': kwargs["device_id"]
}
}
self.assertEqual(
res,
{"uuid": create_response[1]["port"]["id"],
"vlan_id": create_response[1]["port"]["vlan_id"]})
self.assertEqual(
create.call_args_list,
[mock.call(body=expected_call),
mock.call(body=expected_call)])
def test_create_port_raises_after_retry_failures(self):
create_response = [
Exception("uhoh"),
Exception("uhoh"),
Exception("uhoh"),
]
with self._stubs(create_port=create_response) as (driver, _,
create, _):
network_id = "net1"
port_id = "port1"
# mock of the default driver class for the network
base_net_driver = self._create_base_net_driver(
"UNMANAGED")
kwargs = {
"device_id": "device1",
"instance_node_id": "nodeid1",
"mac_address": {"address": 1},
"base_net_driver": base_net_driver,
"addresses": [],
"security_groups": [],
}
self.assertRaises(
quark.drivers.ironic_driver.IronicException,
driver.create_port,
self.context, network_id, port_id, **kwargs)
expected_call = {
'port': {
'switch:hardware_id': kwargs["instance_node_id"],
'device_owner': '',
'network_type': "UNMANAGED",
'mac_address': '00:00:00:00:00:01',
'network_id': network_id,
'tenant_id': self.context.tenant_id,
'dynamic_network': True,
'fixed_ips': [],
'id': port_id,
'device_id': kwargs["device_id"]
}
}
self.assertEqual(create.call_count, 3)
self.assertEqual(
create.call_args_list,
[mock.call(body=expected_call),
mock.call(body=expected_call),
mock.call(body=expected_call)])
def test_create_port_raises_with_missing_required_kwargs(self):
with self._stubs() as (driver, _, create, _):
network_id = "net1"
port_id = "port1"
# mock of the default driver class for the network
base_net_driver = self._create_base_net_driver(
"UNMANAGED")
kwargs = {
"device_id": "device1",
"instance_node_id": "nodeid1",
"mac_address": {"address": 1},
"base_net_driver": base_net_driver,
"addresses": [],
"security_groups": [],
}
for kwarg in ["base_net_driver", "device_id",
"instance_node_id", "mac_address"]:
val = kwargs.pop(kwarg)
self.assertRaises(
quark.drivers.ironic_driver.IronicException,
driver.create_port,
self.context, network_id, port_id, **kwargs)
kwargs[kwarg] = val
create.assert_not_called()
def test_create_port_raises_with_security_groups(self):
with self._stubs() as (driver, _, create, _):
network_id = "net1"
port_id = "port1"
# mock of the default driver class for the network
base_net_driver = self._create_base_net_driver(
"UNMANAGED")
kwargs = {
"device_id": "device1",
"instance_node_id": "nodeid1",
"mac_address": {"address": 1},
"base_net_driver": base_net_driver,
"addresses": [],
"security_groups": ["foo"],
}
self.assertRaises(
quark.drivers.ironic_driver.IronicException,
driver.create_port,
self.context, network_id, port_id, **kwargs)
create.assert_not_called()
def test_create_port_with_fixed_ips(self):
create_response = {
"port": {
"id": "port1",
"vlan_id": 120
}
}
with self._stubs(create_port=[create_response]) as (driver, _,
create, _):
network_id = "net1"
port_id = "port1"
# mock of the default driver class for the network
base_net_driver = self._create_base_net_driver(
"UNMANAGED")
kwargs = {
"device_id": "device1",
"instance_node_id": "nodeid1",
"mac_address": {"address": 1},
"base_net_driver": base_net_driver,
"addresses": [self._create_address("10.0.0.0/30"),
self._create_address("10.0.0.5/30",
default_route=False)],
"security_groups": [],
}
res = driver.create_port(
self.context, network_id, port_id,
**kwargs)
fixed_ips = [
{'subnet': {
'name': 'subnet_name',
'tenant_id': 'fake',
'dns_nameservers': ['8.8.8.8', '8.8.4.4'],
'host_routes': [],
'gateway_ip': '10.0.0.1',
'cidr': '10.0.0.0/30',
'id': 'subnet_10.0.0.0/30'},
'ip_address': '10.0.0.2'},
{'subnet': {
'name': 'subnet_name',
'tenant_id': 'fake',
'dns_nameservers': ['8.8.8.8', '8.8.4.4'],
'host_routes': [
{'nexthop': '10.0.0.5',
'destination': 'earth'},
{'nexthop': '10.0.0.5',
'destination': 'moon'}
],
'gateway_ip': None,
'cidr': '10.0.0.5/30',
'id': 'subnet_10.0.0.5/30'},
'ip_address': '10.0.0.6'}
]
expected_call = {
'port': {
'switch:hardware_id': kwargs["instance_node_id"],
'device_owner': '',
'network_type': "UNMANAGED",
'mac_address': '00:00:00:00:00:01',
'network_id': network_id,
'tenant_id': self.context.tenant_id,
'dynamic_network': True,
'fixed_ips': fixed_ips,
'id': port_id,
'device_id': kwargs["device_id"]
}
}
self.assertEqual(
res, {"uuid": create_response["port"]["id"],
"vlan_id": create_response["port"]["vlan_id"]})
create.assert_called_once_with(body=expected_call)
class TestIronicDriverDeletePort(TestIronicDriverBase):
def test_delete_port(self):
with self._stubs(delete_port=[None]) as (driver, client,
_, delete):
driver.delete_port(self.context, "foo")
delete.assert_called_once_with("foo")
def test_delete_port_retries(self):
delete_port = [Exception("uhoh"), None]
with self._stubs(delete_port=delete_port) as (driver,
client, _,
delete):
driver.delete_port(self.context, "foo")
self.assertEqual(delete.call_count, 2)
self.assertEqual(
delete.call_args_list,
[mock.call("foo"), mock.call("foo")])
def test_delete_port_fail_does_not_raise(self):
delete_port = [Exception("uhoh"),
Exception("uhoh"),
Exception("uhoh")]
with self._stubs(delete_port=delete_port) as (driver,
client, _,
delete):
driver.delete_port(self.context, "foo")
self.assertEqual(delete.call_count, 3)
self.assertEqual(
delete.call_args_list,
[mock.call("foo"), mock.call("foo"),
mock.call("foo")])
def test_delete_port_ignores_404(self):
delete_port = [Exception("404 not found"), None]
with self._stubs(delete_port=delete_port) as (driver,
client, _,
delete):
driver.delete_port(self.context, "foo")
delete.assert_called_once_with("foo")
class TestIronicDriverUpdatePort(TestIronicDriverBase):
def test_update_does_nothing(self):
with self._stubs() as (driver, client,
_, delete):
res = driver.update_port(self.context, "foo", **{})
client.update_port.assert_not_called()
self.assertEqual(res, {"uuid": "foo"})
def test_update_with_sg_raises(self):
with self._stubs() as (driver, client,
_, delete):
self.assertRaises(
quark.drivers.ironic_driver.IronicException,
driver.update_port,
self.context, "foo",
security_groups=["sg1"])
class TestIronicDriverNetwork(TestIronicDriverBase):
def test_create_network_raises(self):
with self._stubs() as (driver, _, _, _):
self.assertRaises(
NotImplementedError,
driver.create_network,
1)
def test_delete_network_raises(self):
with self._stubs() as (driver, _, _, _):
self.assertRaises(
NotImplementedError,
driver.delete_network,
1)
def test_diag_network_raises(self):
with self._stubs() as (driver, _, _, _):
self.assertRaises(
NotImplementedError,
driver.diag_network,
1)
class TestIronicDriverSecurityGroups(TestIronicDriverBase):
def test_create_security_group(self):
with self._stubs() as (driver, _, _, _):
self.assertRaises(
NotImplementedError,
driver.create_security_group,
self.context, "fake")
def test_delete_security_group(self):
with self._stubs() as (driver, _, _, _):
self.assertRaises(
NotImplementedError,
driver.delete_security_group,
self.context, "fake")
def test_update_security_group(self):
with self._stubs() as (driver, _, _, _):
self.assertRaises(
NotImplementedError,
driver.update_security_group,
self.context, "fake")
def test_create_security_group_rule(self):
with self._stubs() as (driver, _, _, _):
self.assertRaises(
NotImplementedError,
driver.create_security_group_rule,
self.context, "fake", "fake_rule")
def test_delete_security_group_rule(self):
with self._stubs() as (driver, _, _, _):
self.assertRaises(
NotImplementedError,
driver.delete_security_group_rule,
self.context, "fake", "fake_rule")

View File

@@ -384,6 +384,10 @@ class TestNVPDriverCreatePort(TestNVPDriver):
get_net_dets.return_value = net_details
yield connection
def test_select_ipam_strategy(self):
strategy = self.driver.select_ipam_strategy(1, "ANY")
self.assertEqual(strategy, "ANY")
def test_create_port_switch_exists(self):
with self._stubs(net_details=dict(foo=3)) as (connection):
port = self.driver.create_port(self.context, self.net_id,
@@ -562,7 +566,8 @@ class TestNVPDriverLswitchesForNetwork(TestNVPDriver):
with contextlib.nested(
mock.patch("%s._connection" % self.d_pkg),
) as (conn,):
connection = self._create_connection(switch_count=1)
connection = self._create_connection(
has_switches=True, switch_count=1)
conn.return_value = connection
yield connection
@@ -575,6 +580,16 @@ class TestNVPDriverLswitchesForNetwork(TestNVPDriver):
connection.query = mock.Mock(return_value=query_mock)
self.driver._lswitches_for_network(self.context, "net_uuid")
def test_get_lswitch_ids_for_network(self):
with self._stubs() as connection:
query_mock = mock.Mock()
query_mock.tags = mock.Mock()
query_mock.tagscopes = mock.Mock()
connection.query = mock.Mock(return_value=query_mock)
lswitch_ids = self.driver.get_lswitch_ids_for_network(
self.context, "net_uuid")
self.assertEqual(lswitch_ids, ['abcd'])
class TestSwitchCopying(TestNVPDriver):
def test_no_existing_switches(self):

View File

@@ -298,6 +298,10 @@ class TestOptimizedNVPDriverCreatePort(TestOptimizedNVPDriver):
get_net_dets.return_value = dict(foo=3)
yield connection, create_opt
def test_select_ipam_strategy(self):
strategy = self.driver.select_ipam_strategy(1, "ANY")
self.assertEqual(strategy, "ANY")
def test_create_port_and_maxed_switch_spanning(self):
'''Testing to ensure a switch is made when maxed.'''
with self._stubs(maxed_ports=True) as (
@@ -579,6 +583,14 @@ class TestQueryMethods(TestOptimizedNVPDriver):
self.driver._lswitches_for_network(self.context, 1)
self.assertTrue(query_return.filter.called)
def test_get_lswitch_ids_for_network(self):
with self._stubs() as query_return:
query_result = query_return.filter.return_value.all
query_result.return_value = [{"nvp_id": "foo"}]
ids = self.driver.get_lswitch_ids_for_network(self.context, 1)
self.assertTrue(query_return.filter.called)
self.assertEqual(ids, ["foo"])
def test_lswitch_from_port(self):
with self._stubs() as query_return:
self.driver._lswitch_from_port(self.context, 1)

View File

@@ -42,6 +42,10 @@ class TestUnmanagedDriver(test_base.TestBase):
def test_get_connection(self):
self.driver.get_connection()
def test_select_ipam_strategy(self):
strategy = self.driver.select_ipam_strategy(1, "ANY")
self.assertEqual(strategy, "ANY")
def test_create_network(self):
self.driver.create_network(context=self.context,
network_name="testwork")