Moves ports into a submodule

Moves ports and their associated tests into separate modules for
improved plugin readability.
This commit is contained in:
Matt Dietz
2013-07-23 19:43:50 +00:00
parent eeb3a884c1
commit ac2663aceb
6 changed files with 948 additions and 1056 deletions

View File

@@ -22,7 +22,7 @@ from oslo.config import cfg
from sqlalchemy.orm import sessionmaker, scoped_session
from zope import sqlalchemy as zsa
from neutron.api.v2 import attributes
#FIXME(mdietz): remove once all resources have moved into submods
from neutron.common import config as neutron_cfg
from neutron.common import exceptions
from neutron.db import api as neutron_db_api
@@ -32,7 +32,6 @@ from neutron.openstack.common.db.sqlalchemy import session as neutron_session
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
from neutron import quota
from neutron import neutron_plugin_base_v2
@@ -44,8 +43,10 @@ from quark import network_strategy
from quark.plugin_modules import ip_addresses
from quark.plugin_modules import ip_policies
from quark.plugin_modules import mac_address_ranges
from quark.plugin_modules import ports
from quark.plugin_modules import security_groups
from quark import plugin_views as v
from quark import utils
LOG = logging.getLogger("neutron.quark")
CONF = cfg.CONF
@@ -53,13 +54,6 @@ DEFAULT_ROUTE = netaddr.IPNetwork("0.0.0.0/0")
STRATEGY = network_strategy.STRATEGY
def _pop_param(attrs, param, default=None):
val = attrs.pop(param, default)
if val is attributes.ATTR_NOT_SPECIFIED:
return default
return val
def append_quark_extensions(conf):
"""Adds the Quark API Extensions to the extension path.
@@ -95,19 +89,6 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
self.ipam_reuse_after = CONF.QUARK.ipam_reuse_after
neutron_db_api.register_models(base=models.BASEV2)
def _make_security_group_list(self, context, group_ids):
if not group_ids or group_ids is attributes.ATTR_NOT_SPECIFIED:
return ([], [])
group_ids = list(set(group_ids))
groups = []
for gid in group_ids:
group = db_api.security_group_find(context, id=gid,
scope=db_api.ONE)
if not group:
raise sg_ext.SecurityGroupNotFound(id=gid)
groups.append(group)
return (group_ids, groups)
def _validate_subnet_cidr(self, context, network_id, new_subnet_cidr):
"""Validate the CIDR for a subnet.
@@ -163,10 +144,10 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
self._validate_subnet_cidr(context, net_id, sub_attrs["cidr"])
cidr = netaddr.IPNetwork(sub_attrs["cidr"])
gateway_ip = _pop_param(sub_attrs, "gateway_ip", str(cidr[1]))
dns_ips = _pop_param(sub_attrs, "dns_nameservers", [])
routes = _pop_param(sub_attrs, "host_routes", [])
allocation_pools = _pop_param(sub_attrs, "allocation_pools", [])
gateway_ip = utils.pop_param(sub_attrs, "gateway_ip", str(cidr[1]))
dns_ips = utils.pop_param(sub_attrs, "dns_nameservers", [])
routes = utils.pop_param(sub_attrs, "host_routes", [])
allocation_pools = utils.pop_param(sub_attrs, "allocation_pools", [])
new_subnet = db_api.subnet_create(context, **sub_attrs)
@@ -349,9 +330,9 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
#TODO(mdietz) going to ignore all the boundary and network
# type checking for now.
attrs = network["network"]
net_type = _pop_param(attrs, pnet.NETWORK_TYPE)
phys_net = _pop_param(attrs, pnet.PHYSICAL_NETWORK)
seg_id = _pop_param(attrs, pnet.SEGMENTATION_ID)
net_type = utils.pop_param(attrs, pnet.NETWORK_TYPE)
phys_net = utils.pop_param(attrs, pnet.PHYSICAL_NETWORK)
seg_id = utils.pop_param(attrs, pnet.SEGMENTATION_ID)
return net_type, phys_net, seg_id
def create_network(self, context, network):
@@ -505,304 +486,6 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
self._delete_subnet(context, subnet)
db_api.network_delete(context, net)
def create_port(self, context, port):
"""Create a port
Create a port which is a connection point of a device (e.g., a VM
NIC) to attach to a L2 Neutron network.
: param context: neutron api request context
: param port: dictionary describing the port, with keys
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py. All keys will be populated.
"""
LOG.info("create_port for tenant %s" % context.tenant_id)
port_attrs = port["port"]
mac_address = _pop_param(port_attrs, "mac_address", None)
segment_id = _pop_param(port_attrs, "segment_id")
fixed_ips = _pop_param(port_attrs, "fixed_ips")
net_id = port_attrs["network_id"]
addresses = []
port_id = uuidutils.generate_uuid()
net = db_api.network_find(context, id=net_id, shared=True,
segment_id=segment_id, scope=db_api.ONE)
if not net:
# Maybe it's a tenant network
net = db_api.network_find(context, id=net_id, scope=db_api.ONE)
if not net:
raise exceptions.NetworkNotFound(net_id=net_id)
quota.QUOTAS.limit_check(
context, context.tenant_id,
ports_per_network=len(net.get('ports', [])) + 1)
if fixed_ips:
for fixed_ip in fixed_ips:
subnet_id = fixed_ip.get("subnet_id")
ip_address = fixed_ip.get("ip_address")
if not (subnet_id and ip_address):
raise exceptions.BadRequest(
resource="fixed_ips",
msg="subnet_id and ip_address required")
addresses.append(self.ipam_driver.allocate_ip_address(
context, net["id"], port_id, self.ipam_reuse_after,
ip_address=ip_address))
else:
addresses.append(self.ipam_driver.allocate_ip_address(
context, net["id"], port_id, self.ipam_reuse_after))
group_ids, security_groups = self._make_security_group_list(
context, port["port"].pop("security_groups", None))
mac = self.ipam_driver.allocate_mac_address(context,
net["id"],
port_id,
self.ipam_reuse_after,
mac_address=mac_address)
mac_address_string = str(netaddr.EUI(mac['address'],
dialect=netaddr.mac_unix))
address_pairs = [{'mac_address': mac_address_string,
'ip_address': address.get('address_readable', '')}
for address in addresses]
backend_port = self.net_driver.create_port(context, net["id"],
port_id=port_id,
security_groups=group_ids,
allowed_pairs=address_pairs)
port_attrs["network_id"] = net["id"]
port_attrs["id"] = port_id
port_attrs["security_groups"] = security_groups
new_port = db_api.port_create(
context, addresses=addresses, mac_address=mac["address"],
backend_key=backend_port["uuid"], **port_attrs)
return v._make_port_dict(new_port)
def update_port(self, context, id, port):
"""Update values of a port.
: param context: neutron api request context
: param id: UUID representing the port to update.
: param port: dictionary with keys indicating fields to update.
valid keys are those that have a value of True for 'allow_put'
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py.
"""
LOG.info("update_port %s for tenant %s" % (id, context.tenant_id))
port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port_db:
raise exceptions.PortNotFound(port_id=id)
address_pairs = []
fixed_ips = port["port"].pop("fixed_ips", None)
if fixed_ips:
self.ipam_driver.deallocate_ip_address(
context, port_db, ipam_reuse_after=self.ipam_reuse_after)
addresses = []
for fixed_ip in fixed_ips:
subnet_id = fixed_ip.get("subnet_id")
ip_address = fixed_ip.get("ip_address")
if not (subnet_id and ip_address):
raise exceptions.BadRequest(
resource="fixed_ips",
msg="subnet_id and ip_address required")
# Note: we don't allow overlapping subnets, thus subnet_id is
# ignored.
addresses.append(self.ipam_driver.allocate_ip_address(
context, port_db["network_id"], id,
self.ipam_reuse_after, ip_address=ip_address))
port["port"]["addresses"] = addresses
mac_address_string = str(netaddr.EUI(port_db.mac_address,
dialect=netaddr.mac_unix))
address_pairs = [{'mac_address': mac_address_string,
'ip_address':
address.get('address_readable', '')}
for address in addresses]
group_ids, security_groups = self._make_security_group_list(
context, port["port"].pop("security_groups", None))
self.net_driver.update_port(context,
port_id=port_db.backend_key,
security_groups=group_ids,
allowed_pairs=address_pairs)
port["port"]["security_groups"] = security_groups
port = db_api.port_update(context,
port_db,
**port["port"])
return v._make_port_dict(port)
def post_update_port(self, context, id, port):
LOG.info("post_update_port %s for tenant %s" % (id, context.tenant_id))
if not port.get("port"):
raise exceptions.BadRequest(resource="ports",
msg="Port body required")
port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port_db:
raise exceptions.PortNotFound(port_id=id, net_id="")
port = port["port"]
if "fixed_ips" in port and port["fixed_ips"]:
for ip in port["fixed_ips"]:
address = None
if ip:
if "ip_id" in ip:
ip_id = ip["ip_id"]
address = db_api.ip_address_find(
context,
id=ip_id,
tenant_id=context.tenant_id,
scope=db_api.ONE)
elif "ip_address" in ip:
ip_address = ip["ip_address"]
net_address = netaddr.IPAddress(ip_address)
address = db_api.ip_address_find(
context,
ip_address=net_address,
network_id=port_db["network_id"],
tenant_id=context.tenant_id,
scope=db_api.ONE)
if not address:
address = self.ipam_driver.allocate_ip_address(
context,
port_db["network_id"],
id,
self.ipam_reuse_after,
ip_address=ip_address)
else:
address = self.ipam_driver.allocate_ip_address(
context,
port_db["network_id"],
id,
self.ipam_reuse_after)
address["deallocated"] = 0
already_contained = False
for port_address in port_db["ip_addresses"]:
if address["id"] == port_address["id"]:
already_contained = True
break
if not already_contained:
port_db["ip_addresses"].append(address)
return v._make_port_dict(port_db)
def get_port(self, context, id, fields=None):
"""Retrieve a port.
: param context: neutron api request context
: param id: UUID representing the port to fetch.
: param fields: a list of strings that are valid keys in a
port dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_port %s for tenant %s fields %s" %
(id, context.tenant_id, fields))
results = db_api.port_find(context, id=id, fields=fields,
scope=db_api.ONE)
if not results:
raise exceptions.PortNotFound(port_id=id, net_id='')
return v._make_port_dict(results)
def get_ports(self, context, filters=None, fields=None):
"""Retrieve a list of ports.
The contents of the list depends on the identity of the user
making the request (as indicated by the context) as well as any
filters.
: param context: neutron api request context
: param filters: a dictionary with keys that are valid keys for
a port as listed in the RESOURCE_ATTRIBUTE_MAP object
in neutron/api/v2/attributes.py. Values in this dictiontary
are an iterable containing values that will be used for an exact
match comparison for that value. Each result returned by this
function will have matched one of the values for each key in
filters.
: param fields: a list of strings that are valid keys in a
port dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_ports for tenant %s filters %s fields %s" %
(context.tenant_id, filters, fields))
if filters is None:
filters = {}
query = db_api.port_find(context, fields=fields, **filters)
return v._make_ports_list(query, fields)
def get_ports_count(self, context, filters=None):
"""Return the number of ports.
The result depends on the identity of the user making the request
(as indicated by the context) as well as any filters.
: param context: neutron api request context
: param filters: a dictionary with keys that are valid keys for
a network as listed in the RESOURCE_ATTRIBUTE_MAP object
in neutron/api/v2/attributes.py. Values in this dictiontary
are an iterable containing values that will be used for an exact
match comparison for that value. Each result returned by this
function will have matched one of the values for each key in
filters.
NOTE: this method is optional, as it was not part of the originally
defined plugin API.
"""
LOG.info("get_ports_count for tenant %s filters %s" %
(context.tenant_id, filters))
return db_api.port_count_all(context, **filters)
def delete_port(self, context, id):
"""Delete a port.
: param context: neutron api request context
: param id: UUID representing the port to delete.
"""
LOG.info("delete_port %s for tenant %s" %
(id, context.tenant_id))
port = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port:
raise exceptions.PortNotFound(net_id=id)
backend_key = port["backend_key"]
mac_address = netaddr.EUI(port["mac_address"]).value
self.ipam_driver.deallocate_mac_address(context,
mac_address)
self.ipam_driver.deallocate_ip_address(
context, port, ipam_reuse_after=self.ipam_reuse_after)
db_api.port_delete(context, port)
self.net_driver.delete_port(context, backend_key)
def disassociate_port(self, context, id, ip_address_id):
"""Disassociates a port from an IP address.
: param context: neutron api request context
: param id: UUID representing the port to disassociate.
: param ip_address_id: UUID representing the IP address to
disassociate.
"""
LOG.info("disassociate_port %s for tenant %s ip_address_id %s" %
(id, context.tenant_id, ip_address_id))
port = db_api.port_find(context, id=id, ip_address_id=[ip_address_id],
scope=db_api.ONE)
if not port:
raise exceptions.PortNotFound(port_id=id, net_id='')
the_address = [address for address in port["ip_addresses"]
if address["id"] == ip_address_id][0]
port["ip_addresses"] = [address for address in port["ip_addresses"]
if address.id != ip_address_id]
if len(the_address["ports"]) == 0:
the_address["deallocated"] = 1
return v._make_port_dict(port)
def get_route(self, context, id):
LOG.info("get_route %s for tenant %s" % (id, context.tenant_id))
route = db_api.route_find(context, id=id, scope=db_api.ONE)
@@ -920,3 +603,27 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
def update_ip_address(self, context, id, ip_address):
return ip_addresses.update_ip_address(context, id, ip_address)
def create_port(self, context, port):
return ports.create_port(context, port)
def post_update_port(self, context, id, port):
return ports.post_update_port(context, id, port)
def get_port(self, context, id, fields=None):
return ports.get_port(context, id, fields)
def update_port(self, context, id, port):
return ports.update_port(context, id, port)
def get_ports(self, context, filters=None, fields=None):
return ports.get_ports(context, filters, fields)
def get_ports_count(self, context, filters=None):
return ports.get_ports_count(context, filters)
def delete_port(self, context, id):
return ports.delete_port(context, id)
def disassociate_port(self, context, id, ip_address_id):
return ports.disassociate_port(context, id, ip_address_id)

View File

@@ -0,0 +1,328 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron.common import exceptions
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
from neutron import quota
from oslo.config import cfg
from quark.db import api as db_api
from quark import plugin_views as v
from quark import utils
CONF = cfg.CONF
LOG = logging.getLogger("neutron.quark")
ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))()
net_driver = (importutils.import_class(CONF.QUARK.net_driver))()
net_driver.load_config(CONF.QUARK.net_driver_cfg)
def create_port(context, port):
"""Create a port
Create a port which is a connection point of a device (e.g., a VM
NIC) to attach to a L2 Neutron network.
: param context: neutron api request context
: param port: dictionary describing the port, with keys
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py. All keys will be populated.
"""
LOG.info("create_port for tenant %s" % context.tenant_id)
port_attrs = port["port"]
mac_address = utils.pop_param(port_attrs, "mac_address", None)
segment_id = utils.pop_param(port_attrs, "segment_id")
fixed_ips = utils.pop_param(port_attrs, "fixed_ips")
net_id = port_attrs["network_id"]
addresses = []
port_id = uuidutils.generate_uuid()
net = db_api.network_find(context, id=net_id, shared=True,
segment_id=segment_id, scope=db_api.ONE)
if not net:
# Maybe it's a tenant network
net = db_api.network_find(context, id=net_id, scope=db_api.ONE)
if not net:
raise exceptions.NetworkNotFound(net_id=net_id)
quota.QUOTAS.limit_check(
context, context.tenant_id,
ports_per_network=len(net.get('ports', [])) + 1)
if fixed_ips:
for fixed_ip in fixed_ips:
subnet_id = fixed_ip.get("subnet_id")
ip_address = fixed_ip.get("ip_address")
if not (subnet_id and ip_address):
raise exceptions.BadRequest(
resource="fixed_ips",
msg="subnet_id and ip_address required")
addresses.append(ipam_driver.allocate_ip_address(
context, net["id"], port_id, CONF.QUARK.ipam_reuse_after,
ip_address=ip_address))
else:
addresses.append(ipam_driver.allocate_ip_address(
context, net["id"], port_id, CONF.QUARK.ipam_reuse_after))
group_ids, security_groups = v.make_security_group_list(
context, port["port"].pop("security_groups", None))
mac = ipam_driver.allocate_mac_address(context, net["id"], port_id,
CONF.QUARK.ipam_reuse_after,
mac_address=mac_address)
mac_address_string = str(netaddr.EUI(mac['address'],
dialect=netaddr.mac_unix))
address_pairs = [{'mac_address': mac_address_string,
'ip_address': address.get('address_readable', '')}
for address in addresses]
backend_port = net_driver.create_port(context, net["id"], port_id=port_id,
security_groups=group_ids,
allowed_pairs=address_pairs)
port_attrs["network_id"] = net["id"]
port_attrs["id"] = port_id
port_attrs["security_groups"] = security_groups
new_port = db_api.port_create(
context, addresses=addresses, mac_address=mac["address"],
backend_key=backend_port["uuid"], **port_attrs)
return v._make_port_dict(new_port)
def update_port(context, id, port):
"""Update values of a port.
: param context: neutron api request context
: param id: UUID representing the port to update.
: param port: dictionary with keys indicating fields to update.
valid keys are those that have a value of True for 'allow_put'
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py.
"""
LOG.info("update_port %s for tenant %s" % (id, context.tenant_id))
port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port_db:
raise exceptions.PortNotFound(port_id=id)
address_pairs = []
fixed_ips = port["port"].pop("fixed_ips", None)
if fixed_ips:
ipam_driver.deallocate_ip_address(
context, port_db, ipam_reuse_after=CONF.QUARK.ipam_reuse_after)
addresses = []
for fixed_ip in fixed_ips:
subnet_id = fixed_ip.get("subnet_id")
ip_address = fixed_ip.get("ip_address")
if not (subnet_id and ip_address):
raise exceptions.BadRequest(
resource="fixed_ips",
msg="subnet_id and ip_address required")
# Note: we don't allow overlapping subnets, thus subnet_id is
# ignored.
addresses.append(ipam_driver.allocate_ip_address(
context, port_db["network_id"], id,
CONF.QUARK.ipam_reuse_after, ip_address=ip_address))
port["port"]["addresses"] = addresses
mac_address_string = str(netaddr.EUI(port_db.mac_address,
dialect=netaddr.mac_unix))
address_pairs = [{'mac_address': mac_address_string,
'ip_address':
address.get('address_readable', '')}
for address in addresses]
group_ids, security_groups = v.make_security_group_list(
context, port["port"].pop("security_groups", None))
net_driver.update_port(context, port_id=port_db.backend_key,
security_groups=group_ids,
allowed_pairs=address_pairs)
port["port"]["security_groups"] = security_groups
port = db_api.port_update(context, port_db, **port["port"])
return v._make_port_dict(port)
def post_update_port(context, id, port):
LOG.info("post_update_port %s for tenant %s" % (id, context.tenant_id))
if not port.get("port"):
raise exceptions.BadRequest(resource="ports",
msg="Port body required")
port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port_db:
raise exceptions.PortNotFound(port_id=id, net_id="")
port = port["port"]
if "fixed_ips" in port and port["fixed_ips"]:
for ip in port["fixed_ips"]:
address = None
if ip:
if "ip_id" in ip:
ip_id = ip["ip_id"]
address = db_api.ip_address_find(
context,
id=ip_id,
tenant_id=context.tenant_id,
scope=db_api.ONE)
elif "ip_address" in ip:
ip_address = ip["ip_address"]
net_address = netaddr.IPAddress(ip_address)
address = db_api.ip_address_find(
context,
ip_address=net_address,
network_id=port_db["network_id"],
tenant_id=context.tenant_id,
scope=db_api.ONE)
if not address:
address = ipam_driver.allocate_ip_address(
context, port_db["network_id"], id,
CONF.QUARK.ipam_reuse_after, ip_address=ip_address)
else:
address = ipam_driver.allocate_ip_address(
context, port_db["network_id"], id,
CONF.QUARK.ipam_reuse_after)
address["deallocated"] = 0
already_contained = False
for port_address in port_db["ip_addresses"]:
if address["id"] == port_address["id"]:
already_contained = True
break
if not already_contained:
port_db["ip_addresses"].append(address)
return v._make_port_dict(port_db)
def get_port(context, id, fields=None):
"""Retrieve a port.
: param context: neutron api request context
: param id: UUID representing the port to fetch.
: param fields: a list of strings that are valid keys in a
port dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_port %s for tenant %s fields %s" %
(id, context.tenant_id, fields))
results = db_api.port_find(context, id=id, fields=fields,
scope=db_api.ONE)
if not results:
raise exceptions.PortNotFound(port_id=id, net_id='')
return v._make_port_dict(results)
def get_ports(context, filters=None, fields=None):
"""Retrieve a list of ports.
The contents of the list depends on the identity of the user
making the request (as indicated by the context) as well as any
filters.
: param context: neutron api request context
: param filters: a dictionary with keys that are valid keys for
a port as listed in the RESOURCE_ATTRIBUTE_MAP object
in neutron/api/v2/attributes.py. Values in this dictiontary
are an iterable containing values that will be used for an exact
match comparison for that value. Each result returned by this
function will have matched one of the values for each key in
filters.
: param fields: a list of strings that are valid keys in a
port dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_ports for tenant %s filters %s fields %s" %
(context.tenant_id, filters, fields))
if filters is None:
filters = {}
query = db_api.port_find(context, fields=fields, **filters)
return v._make_ports_list(query, fields)
def get_ports_count(context, filters=None):
"""Return the number of ports.
The result depends on the identity of the user making the request
(as indicated by the context) as well as any filters.
: param context: neutron api request context
: param filters: a dictionary with keys that are valid keys for
a network as listed in the RESOURCE_ATTRIBUTE_MAP object
in neutron/api/v2/attributes.py. Values in this dictiontary
are an iterable containing values that will be used for an exact
match comparison for that value. Each result returned by this
function will have matched one of the values for each key in
filters.
NOTE: this method is optional, as it was not part of the originally
defined plugin API.
"""
LOG.info("get_ports_count for tenant %s filters %s" %
(context.tenant_id, filters))
return db_api.port_count_all(context, **filters)
def delete_port(context, id):
"""Delete a port.
: param context: neutron api request context
: param id: UUID representing the port to delete.
"""
LOG.info("delete_port %s for tenant %s" %
(id, context.tenant_id))
port = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port:
raise exceptions.PortNotFound(net_id=id)
backend_key = port["backend_key"]
mac_address = netaddr.EUI(port["mac_address"]).value
ipam_driver.deallocate_mac_address(context, mac_address)
ipam_driver.deallocate_ip_address(
context, port, ipam_reuse_after=CONF.QUARK.ipam_reuse_after)
db_api.port_delete(context, port)
net_driver.delete_port(context, backend_key)
def disassociate_port(context, id, ip_address_id):
"""Disassociates a port from an IP address.
: param context: neutron api request context
: param id: UUID representing the port to disassociate.
: param ip_address_id: UUID representing the IP address to
disassociate.
"""
LOG.info("disassociate_port %s for tenant %s ip_address_id %s" %
(id, context.tenant_id, ip_address_id))
port = db_api.port_find(context, id=id, ip_address_id=[ip_address_id],
scope=db_api.ONE)
if not port:
raise exceptions.PortNotFound(port_id=id, net_id='')
the_address = [address for address in port["ip_addresses"]
if address["id"] == ip_address_id][0]
port["ip_addresses"] = [address for address in port["ip_addresses"]
if address.id != ip_address_id]
if len(the_address["ports"]) == 0:
the_address["deallocated"] = 1
return v._make_port_dict(port)

View File

@@ -19,8 +19,13 @@ View Helpers for Quark Plugin
import netaddr
from neutron.extensions import securitygroup as sg_ext
from quark.db import api as db_api
from quark.ipam import QuarkIpam
from quark import network_strategy
from quark import utils
STRATEGY = network_strategy.STRATEGY
@@ -201,3 +206,17 @@ def _make_ip_policy_dict(ipp):
"subnet_id": ipp["subnet_id"],
"network_id": ipp["network_id"],
"exclude": excludes}
def make_security_group_list(context, group_ids):
if not group_ids or not utils.attr_specified(group_ids):
return ([], [])
group_ids = list(set(group_ids))
groups = []
for gid in group_ids:
group = db_api.security_group_find(context, id=gid,
scope=db_api.ONE)
if not group:
raise sg_ext.SecurityGroupNotFound(id=gid)
groups.append(group)
return (group_ids, groups)

View File

@@ -0,0 +1,540 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from neutron.api.v2 import attributes as neutron_attrs
from neutron.common import exceptions
from neutron.extensions import securitygroup as sg_ext
from quark.db import api as quark_db_api
from quark.db import models
from quark.tests import test_quark_plugin
class TestQuarkGetPorts(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, ports=None, addrs=None):
port_models = []
addr_models = None
if addrs:
addr_models = []
for address in addrs:
a = models.IPAddress()
a.update(address)
addr_models.append(a)
if isinstance(ports, list):
for port in ports:
port_model = models.Port()
port_model.update(port)
if addr_models:
port_model.ip_addresses = addr_models
port_models.append(port_model)
elif ports is None:
port_models = None
else:
port_model = models.Port()
port_model.update(ports)
if addr_models:
port_model.ip_addresses = addr_models
port_models = port_model
db_mod = "quark.db.api"
with contextlib.nested(
mock.patch("%s.port_find" % db_mod)
) as (port_find,):
port_find.return_value = port_models
yield
def test_port_list_no_ports(self):
with self._stubs(ports=[]):
ports = self.plugin.get_ports(self.context, filters=None,
fields=None)
self.assertEqual(ports, [])
def test_port_list_with_ports(self):
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
port = dict(mac_address="aa:bb:cc:dd:ee:ff", network_id=1,
tenant_id=self.context.tenant_id, device_id=2)
expected = {'status': None,
'device_owner': None,
'mac_address': 'aa:bb:cc:dd:ee:ff',
'network_id': 1,
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'device_id': 2}
with self._stubs(ports=[port], addrs=[ip]):
ports = self.plugin.get_ports(self.context, filters=None,
fields=None)
self.assertEqual(len(ports), 1)
fixed_ips = ports[0].pop("fixed_ips")
for key in expected.keys():
self.assertEqual(ports[0][key], expected[key])
self.assertEqual(fixed_ips[0]["subnet_id"], ip["subnet_id"])
self.assertEqual(fixed_ips[0]["ip_address"],
ip["address_readable"])
def test_port_show(self):
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
port = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
tenant_id=self.context.tenant_id, device_id=2)
expected = {'status': None,
'device_owner': None,
'mac_address': 'AA:BB:CC:DD:EE:FF',
'network_id': 1,
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'device_id': 2}
with self._stubs(ports=port, addrs=[ip]):
result = self.plugin.get_port(self.context, 1)
fixed_ips = result.pop("fixed_ips")
for key in expected.keys():
self.assertEqual(result[key], expected[key])
self.assertEqual(fixed_ips[0]["subnet_id"], ip["subnet_id"])
self.assertEqual(fixed_ips[0]["ip_address"],
ip["address_readable"])
def test_port_show_with_int_mac(self):
port = dict(mac_address=187723572702975L, network_id=1,
tenant_id=self.context.tenant_id, device_id=2)
expected = {'status': None,
'device_owner': None,
'mac_address': 'aa:bb:cc:dd:ee:ff',
'network_id': 1,
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [],
'device_id': 2}
with self._stubs(ports=port):
result = self.plugin.get_port(self.context, 1)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_port_show_not_found(self):
with self._stubs(ports=None):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.get_port(self.context, 1)
class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port=None, network=None, addr=None, mac=None):
port_model = models.Port()
port_model.update(port)
port_models = port_model
db_mod = "quark.db.api"
ipam = "quark.ipam.QuarkIpam"
with contextlib.nested(
mock.patch("%s.port_create" % db_mod),
mock.patch("%s.network_find" % db_mod),
mock.patch("%s.allocate_ip_address" % ipam),
mock.patch("%s.allocate_mac_address" % ipam),
) as (port_create, net_find, alloc_ip, alloc_mac):
port_create.return_value = port_models
net_find.return_value = network
alloc_ip.return_value = addr
alloc_mac.return_value = mac
yield port_create
def test_create_port(self):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
port_name = "foobar"
ip = dict()
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name))
expected = {'status': None,
'name': port_name,
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [],
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac) as port_create:
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_mac_address_not_specified(self):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
ip = dict()
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2))
expected = {'status': None,
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [],
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac) as port_create:
port["port"]["mac_address"] = neutron_attrs.ATTR_NOT_SPECIFIED
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_fixed_ips(self):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
ip = mock.MagicMock()
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
ip.formatted = lambda: "192.168.10.45"
fixed_ips = [dict(subnet_id=1, ip_address="192.168.10.45")]
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
fixed_ips=fixed_ips, ip_addresses=[ip]))
expected = {'status': None,
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': fixed_ips,
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac) as port_create:
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_fixed_ips_bad_request(self):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
ip = mock.MagicMock()
ip.get = lambda x: 1 if x == "subnet_id" else None
ip.formatted = lambda: "192.168.10.45"
fixed_ips = [dict()]
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
fixed_ips=fixed_ips, ip_addresses=[ip]))
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac):
with self.assertRaises(exceptions.BadRequest):
self.plugin.create_port(self.context, port)
def test_create_port_no_network_found(self):
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
with self._stubs(network=None, port=port["port"]):
with self.assertRaises(exceptions.NetworkNotFound):
self.plugin.create_port(self.context, port)
def test_create_port_net_at_max(self):
network = dict(id=1, ports=[models.Port()])
mac = dict(address="aa:bb:cc:dd:ee:ff")
port_name = "foobar"
ip = dict()
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name))
with self._stubs(port=port["port"], network=network, addr=ip, mac=mac):
with self.assertRaises(exceptions.OverQuota):
self.plugin.create_port(self.context, port)
def test_create_port_security_groups(self, groups=[1]):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
port_name = "foobar"
ip = dict()
group = models.SecurityGroup()
group.update({'id': 1, 'tenant_id': self.context.tenant_id,
'name': 'foo', 'description': 'bar'})
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name, security_groups=[group]))
expected = {'status': None,
'name': port_name,
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [],
'security_groups': groups,
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac) as port_create:
with mock.patch("quark.db.api.security_group_find") as group_find:
group_find.return_value = (groups and group)
port["port"]["security_groups"] = groups or [1]
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_security_groups_not_found(self):
with self.assertRaises(sg_ext.SecurityGroupNotFound):
self.test_create_port_security_groups([])
class TestQuarkUpdatePort(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port):
port_model = None
if port:
port_model = models.Port()
port_model.update(port)
with contextlib.nested(
mock.patch("quark.db.api.port_find"),
mock.patch("quark.db.api.port_update"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
mock.patch("quark.ipam.QuarkIpam.deallocate_ip_address")
) as (port_find, port_update, alloc_ip, dealloc_ip):
port_find.return_value = port_model
yield port_find, port_update, alloc_ip, dealloc_ip
def test_update_port_not_found(self):
with self._stubs(port=None):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.update_port(self.context, 1, {})
def test_update_port(self):
with self._stubs(
port=dict(id=1, name="myport")
) as (port_find, port_update, alloc_ip, dealloc_ip):
new_port = dict(port=dict(name="ourport"))
self.plugin.update_port(self.context, 1, new_port)
self.assertEqual(port_find.call_count, 1)
port_update.assert_called_once_with(
self.context,
port_find(),
name="ourport",
security_groups=[])
def test_update_port_fixed_ip_bad_request(self):
with self._stubs(
port=dict(id=1, name="myport")
) as (port_find, port_update, alloc_ip, dealloc_ip):
new_port = dict(port=dict(
fixed_ips=[dict(subnet_id=None,
ip_address=None)]))
with self.assertRaises(exceptions.BadRequest):
self.plugin.update_port(self.context, 1, new_port)
def test_update_port_fixed_ip(self):
with self._stubs(
port=dict(id=1, name="myport", mac_address="0:0:0:0:0:1")
) as (port_find, port_update, alloc_ip, dealloc_ip):
new_port = dict(port=dict(
fixed_ips=[dict(subnet_id=1,
ip_address="1.1.1.1")]))
self.plugin.update_port(self.context, 1, new_port)
self.assertEqual(dealloc_ip.call_count, 1)
self.assertEqual(alloc_ip.call_count, 1)
class TestQuarkPostUpdatePort(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port, addr, addr2=None):
port_model = None
addr_model = None
addr_model2 = None
if port:
port_model = models.Port()
port_model.update(port)
if addr:
addr_model = models.IPAddress()
addr_model.update(addr)
if addr2:
addr_model2 = models.IPAddress()
addr_model2.update(addr2)
with contextlib.nested(
mock.patch("quark.db.api.port_find"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
mock.patch("quark.db.api.ip_address_find")
) as (port_find, alloc_ip, ip_find):
port_find.return_value = port_model
alloc_ip.return_value = addr_model2 if addr_model2 else addr_model
ip_find.return_value = addr_model
yield port_find, alloc_ip, ip_find
def test_post_update_port_no_ports(self):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.post_update_port(self.context, 1,
{"port": {"network_id": 1}})
def test_post_update_port_fixed_ips_empty_body(self):
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
with self._stubs(port=port, addr=None):
with self.assertRaises(exceptions.BadRequest):
self.plugin.post_update_port(self.context, 1, {})
with self.assertRaises(exceptions.BadRequest):
self.plugin.post_update_port(self.context, 1, {"port": {}})
def test_post_update_port_fixed_ips_ip(self):
new_port_ip = dict(port=dict(fixed_ips=[dict()]))
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, deallocated=True)
with self._stubs(port=port, addr=ip) as (port_find, alloc_ip, ip_find):
response = self.plugin.post_update_port(self.context, 1,
new_port_ip)
self.assertEqual(port_find.call_count, 1)
self.assertEqual(alloc_ip.call_count, 1)
self.assertEqual(ip_find.call_count, 0)
self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.100")
def test_post_update_port_fixed_ips_ip_id(self):
new_port_ip = dict(port=dict(fixed_ips=[dict(ip_id=1)]))
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, deallocated=True)
with self._stubs(port=port, addr=ip) as (port_find, alloc_ip, ip_find):
response = self.plugin.post_update_port(self.context, 1,
new_port_ip)
self.assertEqual(port_find.call_count, 1)
self.assertEqual(alloc_ip.call_count, 0)
self.assertEqual(ip_find.call_count, 1)
self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.100")
def test_post_update_port_fixed_ips_ip_address_exists(self):
new_port_ip = dict(port=dict(fixed_ips=[dict(
ip_address="192.168.1.100")]))
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, deallocated=True)
with self._stubs(port=port, addr=ip) as (port_find, alloc_ip, ip_find):
response = self.plugin.post_update_port(self.context, 1,
new_port_ip)
self.assertEqual(port_find.call_count, 1)
self.assertEqual(alloc_ip.call_count, 0)
self.assertEqual(ip_find.call_count, 1)
self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.100")
def test_post_update_port_fixed_ips_ip_address_doesnt_exist(self):
new_port_ip = dict(port=dict(fixed_ips=[dict(
ip_address="192.168.1.101")]))
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
ip = dict(id=1, address=3232235876, address_readable="192.168.1.101",
subnet_id=1, network_id=2, version=4, deallocated=True)
with self._stubs(port=port, addr=None, addr2=ip) as \
(port_find, alloc_ip, ip_find):
response = self.plugin.post_update_port(self.context, 1,
new_port_ip)
self.assertEqual(port_find.call_count, 1)
self.assertEqual(alloc_ip.call_count, 1)
self.assertEqual(ip_find.call_count, 1)
self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.101")
class TestQuarkGetPortCount(test_quark_plugin.TestQuarkPlugin):
def test_get_port_count(self):
"""This isn't really testable."""
with mock.patch("quark.db.api.port_count_all"):
self.plugin.get_ports_count(self.context, {})
class TestQuarkDeletePort(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port=None, addr=None, mac=None):
port_models = None
if port:
port_model = models.Port()
port_model.update(port)
port_models = port_model
db_mod = "quark.db.api"
ipam = "quark.ipam.QuarkIpam"
with contextlib.nested(
mock.patch("%s.port_find" % db_mod),
mock.patch("%s.deallocate_ip_address" % ipam),
mock.patch("%s.deallocate_mac_address" % ipam),
mock.patch("%s.port_delete" % db_mod),
mock.patch("quark.drivers.base.BaseDriver.delete_port")
) as (port_find, dealloc_ip, dealloc_mac, db_port_del,
driver_port_del):
port_find.return_value = port_models
dealloc_ip.return_value = addr
dealloc_mac.return_value = mac
yield db_port_del, driver_port_del
def test_port_delete(self):
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2, mac_address="AA:BB:CC:DD:EE:FF",
backend_key="foo"))
with self._stubs(port=port["port"]) as (db_port_del, driver_port_del):
self.plugin.delete_port(self.context, 1)
self.assertTrue(db_port_del.called)
driver_port_del.assert_called_with(self.context, "foo")
def test_port_delete_port_not_found_fails(self):
with self._stubs(port=None) as (db_port_del, driver_port_del):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.delete_port(self.context, 1)
class TestQuarkDisassociatePort(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port=None):
port_models = None
if port:
port_model = models.Port()
port_model.update(port)
for ip in port["fixed_ips"]:
port_model.ip_addresses.append(models.IPAddress(
id=1,
address=ip["ip_address"],
subnet_id=ip["subnet_id"]))
port_models = port_model
db_mod = "quark.db.api"
with mock.patch("%s.port_find" % db_mod) as port_find:
port_find.return_value = port_models
yield port_find
def test_port_disassociate_port(self):
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
fixed_ips = [{"subnet_id": ip["subnet_id"],
"ip_address": ip["address_readable"]}]
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2, mac_address="AA:BB:CC:DD:EE:FF",
backend_key="foo", fixed_ips=fixed_ips))
with self._stubs(port=port["port"]) as (port_find):
new_port = self.plugin.disassociate_port(self.context, 1, 1)
port_find.assert_called_with(self.context,
id=1,
ip_address_id=[1],
scope=quark_db_api.ONE)
self.assertEqual(new_port["fixed_ips"], [])
def test_port_disassociate_port_not_found_fails(self):
with self._stubs(port=None):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.disassociate_port(self.context, 1, 1)

View File

@@ -21,10 +21,8 @@ import mock
from neutron.api.v2 import attributes as neutron_attrs
from neutron.common import exceptions
from neutron.db import api as db_api
from neutron.extensions import securitygroup as sg_ext
from oslo.config import cfg
from quark.db import api as quark_db_api
from quark.db import models
from quark import exceptions as quark_exceptions
import quark.plugin
@@ -902,671 +900,6 @@ class TestQuarkCreateNetwork(TestQuarkPlugin):
self.assertEqual(net["tenant_id"], 0)
class TestIpAddresses(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port, addr):
port_model = None
addr_model = None
if port:
port_model = models.Port()
port_model.update(port)
if addr:
addr_model = models.IPAddress()
addr_model.update(addr)
with contextlib.nested(
mock.patch("quark.db.api.port_find"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address")
) as (port_find, alloc_ip):
port_find.return_value = port_model
alloc_ip.return_value = addr_model
yield
def test_create_ip_address_by_network_and_device(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4,
tenant_id=self.context.tenant_id)
with self._stubs(port=port, addr=ip):
ip_address = dict(network_id=ip["network_id"],
device_ids=[4])
response = self.plugin.create_ip_address(
self.context, dict(ip_address=ip_address))
self.assertIsNotNone(response["id"])
self.assertEqual(response["network_id"], ip_address["network_id"])
self.assertEqual(response["device_ids"], [""])
self.assertEqual(response["port_ids"], [port["id"]])
self.assertEqual(response["subnet_id"], ip["subnet_id"])
self.assertEqual(response["tenant_id"], self.context.tenant_id)
self.assertFalse(response["shared"])
self.assertEqual(response["version"], 4)
self.assertEqual(response["address"], "192.168.1.100")
def test_create_ip_address_with_port(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(port=port, addr=ip):
ip_address = dict(port_ids=[port["id"]])
response = self.plugin.create_ip_address(
self.context, dict(ip_address=ip_address))
self.assertIsNotNone(response['id'])
self.assertEqual(response['network_id'], ip["network_id"])
self.assertEqual(response['port_ids'], [port["id"]])
self.assertEqual(response['subnet_id'], ip['id'])
def test_create_ip_address_by_device_no_network_fails(self):
with self._stubs(port={}, addr=None):
ip_address = dict(device_ids=[4])
with self.assertRaises(exceptions.BadRequest):
self.plugin.create_ip_address(self.context,
dict(ip_address=ip_address))
def test_create_ip_address_invalid_network_and_device(self):
with self._stubs(port=None, addr=None):
with self.assertRaises(exceptions.PortNotFound):
ip_address = {'ip_address': {'network_id': 'fake',
'device_id': 'fake'}}
self.plugin.create_ip_address(self.context, ip_address)
def test_create_ip_address_invalid_port(self):
with self._stubs(port=None, addr=None):
with self.assertRaises(exceptions.PortNotFound):
ip_address = {'ip_address': {'port_id': 'fake'}}
self.plugin.create_ip_address(self.context, ip_address)
class TestQuarkUpdateIPAddress(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, ports, addr, addr_ports=False):
port_models = []
addr_model = None
for port in ports:
port_model = models.Port()
port_model.update(port)
port_models.append(port_model)
if addr:
addr_model = models.IPAddress()
addr_model.update(addr)
if addr_ports:
addr_model.ports = port_models
db_mod = "quark.db.api"
with contextlib.nested(
mock.patch("%s.port_find" % db_mod),
mock.patch("%s.ip_address_find" % db_mod),
) as (port_find, ip_find):
port_find.return_value = port_models
ip_find.return_value = addr_model
yield
def test_update_ip_address_does_not_exist(self):
with self._stubs(ports=[], addr=None):
with self.assertRaises(exceptions.NotFound):
self.plugin.update_ip_address(self.context,
'no_ip_address_id',
{'ip_address': {'port_ids': []}})
def test_update_ip_address_port_not_found(self):
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ports=[], addr=ip):
with self.assertRaises(exceptions.NotFound):
ip_address = {'ip_address': {'port_ids': ['fake']}}
self.plugin.update_ip_address(self.context,
ip["id"],
ip_address)
def test_update_ip_address_specify_ports(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ports=[port], addr=ip):
ip_address = {'ip_address': {'port_ids': [port['id']]}}
response = self.plugin.update_ip_address(self.context,
ip['id'],
ip_address)
self.assertEqual(response['port_ids'], [port['id']])
def test_update_ip_address_no_ports(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ports=[port], addr=ip):
ip_address = {'ip_address': {}}
response = self.plugin.update_ip_address(self.context,
ip['id'],
ip_address)
self.assertEqual(response['port_ids'], [])
def test_update_ip_address_empty_ports_delete(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ports=[port], addr=ip, addr_ports=True):
ip_address = {'ip_address': {'port_ids': []}}
response = self.plugin.update_ip_address(self.context,
ip['id'],
ip_address)
self.assertEqual(response['port_ids'], [])
class TestQuarkGetPorts(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, ports=None, addrs=None):
port_models = []
addr_models = None
if addrs:
addr_models = []
for address in addrs:
a = models.IPAddress()
a.update(address)
addr_models.append(a)
if isinstance(ports, list):
for port in ports:
port_model = models.Port()
port_model.update(port)
if addr_models:
port_model.ip_addresses = addr_models
port_models.append(port_model)
elif ports is None:
port_models = None
else:
port_model = models.Port()
port_model.update(ports)
if addr_models:
port_model.ip_addresses = addr_models
port_models = port_model
db_mod = "quark.db.api"
with contextlib.nested(
mock.patch("%s.port_find" % db_mod)
) as (port_find,):
port_find.return_value = port_models
yield
def test_port_list_no_ports(self):
with self._stubs(ports=[]):
ports = self.plugin.get_ports(self.context, filters=None,
fields=None)
self.assertEqual(ports, [])
def test_port_list_with_ports(self):
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
port = dict(mac_address="aa:bb:cc:dd:ee:ff", network_id=1,
tenant_id=self.context.tenant_id, device_id=2)
expected = {'status': None,
'device_owner': None,
'mac_address': 'aa:bb:cc:dd:ee:ff',
'network_id': 1,
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'device_id': 2}
with self._stubs(ports=[port], addrs=[ip]):
ports = self.plugin.get_ports(self.context, filters=None,
fields=None)
self.assertEqual(len(ports), 1)
fixed_ips = ports[0].pop("fixed_ips")
for key in expected.keys():
self.assertEqual(ports[0][key], expected[key])
self.assertEqual(fixed_ips[0]["subnet_id"], ip["subnet_id"])
self.assertEqual(fixed_ips[0]["ip_address"],
ip["address_readable"])
def test_port_show(self):
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
port = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
tenant_id=self.context.tenant_id, device_id=2)
expected = {'status': None,
'device_owner': None,
'mac_address': 'AA:BB:CC:DD:EE:FF',
'network_id': 1,
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'device_id': 2}
with self._stubs(ports=port, addrs=[ip]):
result = self.plugin.get_port(self.context, 1)
fixed_ips = result.pop("fixed_ips")
for key in expected.keys():
self.assertEqual(result[key], expected[key])
self.assertEqual(fixed_ips[0]["subnet_id"], ip["subnet_id"])
self.assertEqual(fixed_ips[0]["ip_address"],
ip["address_readable"])
def test_port_show_with_int_mac(self):
port = dict(mac_address=187723572702975L, network_id=1,
tenant_id=self.context.tenant_id, device_id=2)
expected = {'status': None,
'device_owner': None,
'mac_address': 'aa:bb:cc:dd:ee:ff',
'network_id': 1,
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [],
'device_id': 2}
with self._stubs(ports=port):
result = self.plugin.get_port(self.context, 1)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_port_show_not_found(self):
with self._stubs(ports=None):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.get_port(self.context, 1)
class TestQuarkCreatePort(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port=None, network=None, addr=None, mac=None):
port_model = models.Port()
port_model.update(port)
port_models = port_model
db_mod = "quark.db.api"
ipam = "quark.ipam.QuarkIpam"
with contextlib.nested(
mock.patch("%s.port_create" % db_mod),
mock.patch("%s.network_find" % db_mod),
mock.patch("%s.allocate_ip_address" % ipam),
mock.patch("%s.allocate_mac_address" % ipam),
) as (port_create, net_find, alloc_ip, alloc_mac):
port_create.return_value = port_models
net_find.return_value = network
alloc_ip.return_value = addr
alloc_mac.return_value = mac
yield port_create
def test_create_port(self):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
port_name = "foobar"
ip = dict()
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name))
expected = {'status': None,
'name': port_name,
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [],
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac) as port_create:
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_mac_address_not_specified(self):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
ip = dict()
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2))
expected = {'status': None,
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [],
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac) as port_create:
port["port"]["mac_address"] = neutron_attrs.ATTR_NOT_SPECIFIED
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_fixed_ips(self):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
ip = mock.MagicMock()
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
ip.formatted = lambda: "192.168.10.45"
fixed_ips = [dict(subnet_id=1, ip_address="192.168.10.45")]
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
fixed_ips=fixed_ips, ip_addresses=[ip]))
expected = {'status': None,
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': fixed_ips,
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac) as port_create:
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_fixed_ips_bad_request(self):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
ip = mock.MagicMock()
ip.get = lambda x: 1 if x == "subnet_id" else None
ip.formatted = lambda: "192.168.10.45"
fixed_ips = [dict()]
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
fixed_ips=fixed_ips, ip_addresses=[ip]))
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac):
with self.assertRaises(exceptions.BadRequest):
self.plugin.create_port(self.context, port)
def test_create_port_no_network_found(self):
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
with self._stubs(network=None, port=port["port"]):
with self.assertRaises(exceptions.NetworkNotFound):
self.plugin.create_port(self.context, port)
def test_create_port_net_at_max(self):
network = dict(id=1, ports=[models.Port()])
mac = dict(address="aa:bb:cc:dd:ee:ff")
port_name = "foobar"
ip = dict()
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name))
with self._stubs(port=port["port"], network=network, addr=ip, mac=mac):
with self.assertRaises(exceptions.OverQuota):
self.plugin.create_port(self.context, port)
def test_create_port_security_groups(self, groups=[1]):
network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff")
port_name = "foobar"
ip = dict()
group = models.SecurityGroup()
group.update({'id': 1, 'tenant_id': self.context.tenant_id,
'name': 'foo', 'description': 'bar'})
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name, security_groups=[group]))
expected = {'status': None,
'name': port_name,
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [],
'security_groups': groups,
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac) as port_create:
with mock.patch("quark.db.api.security_group_find") as group_find:
group_find.return_value = (groups and group)
port["port"]["security_groups"] = groups or [1]
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_security_groups_not_found(self):
with self.assertRaises(sg_ext.SecurityGroupNotFound):
self.test_create_port_security_groups([])
class TestQuarkUpdatePort(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port):
port_model = None
if port:
port_model = models.Port()
port_model.update(port)
with contextlib.nested(
mock.patch("quark.db.api.port_find"),
mock.patch("quark.db.api.port_update"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
mock.patch("quark.ipam.QuarkIpam.deallocate_ip_address")
) as (port_find, port_update, alloc_ip, dealloc_ip):
port_find.return_value = port_model
yield port_find, port_update, alloc_ip, dealloc_ip
def test_update_port_not_found(self):
with self._stubs(port=None):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.update_port(self.context, 1, {})
def test_update_port(self):
with self._stubs(
port=dict(id=1, name="myport")
) as (port_find, port_update, alloc_ip, dealloc_ip):
new_port = dict(port=dict(name="ourport"))
self.plugin.update_port(self.context, 1, new_port)
self.assertEqual(port_find.call_count, 1)
port_update.assert_called_once_with(
self.context,
port_find(),
name="ourport",
security_groups=[])
def test_update_port_fixed_ip_bad_request(self):
with self._stubs(
port=dict(id=1, name="myport")
) as (port_find, port_update, alloc_ip, dealloc_ip):
new_port = dict(port=dict(
fixed_ips=[dict(subnet_id=None,
ip_address=None)]))
with self.assertRaises(exceptions.BadRequest):
self.plugin.update_port(self.context, 1, new_port)
def test_update_port_fixed_ip(self):
with self._stubs(
port=dict(id=1, name="myport", mac_address="0:0:0:0:0:1")
) as (port_find, port_update, alloc_ip, dealloc_ip):
new_port = dict(port=dict(
fixed_ips=[dict(subnet_id=1,
ip_address="1.1.1.1")]))
self.plugin.update_port(self.context, 1, new_port)
self.assertEqual(dealloc_ip.call_count, 1)
self.assertEqual(alloc_ip.call_count, 1)
class TestQuarkPostUpdatePort(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port, addr, addr2=None):
port_model = None
addr_model = None
addr_model2 = None
if port:
port_model = models.Port()
port_model.update(port)
if addr:
addr_model = models.IPAddress()
addr_model.update(addr)
if addr2:
addr_model2 = models.IPAddress()
addr_model2.update(addr2)
with contextlib.nested(
mock.patch("quark.db.api.port_find"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
mock.patch("quark.db.api.ip_address_find")
) as (port_find, alloc_ip, ip_find):
port_find.return_value = port_model
alloc_ip.return_value = addr_model2 if addr_model2 else addr_model
ip_find.return_value = addr_model
yield port_find, alloc_ip, ip_find
def test_post_update_port_no_ports(self):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.post_update_port(self.context, 1,
{"port": {"network_id": 1}})
def test_post_update_port_fixed_ips_empty_body(self):
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
with self._stubs(port=port, addr=None):
with self.assertRaises(exceptions.BadRequest):
self.plugin.post_update_port(self.context, 1, {})
with self.assertRaises(exceptions.BadRequest):
self.plugin.post_update_port(self.context, 1, {"port": {}})
def test_post_update_port_fixed_ips_ip(self):
new_port_ip = dict(port=dict(fixed_ips=[dict()]))
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, deallocated=True)
with self._stubs(port=port, addr=ip) as (port_find, alloc_ip, ip_find):
response = self.plugin.post_update_port(self.context, 1,
new_port_ip)
self.assertEqual(port_find.call_count, 1)
self.assertEqual(alloc_ip.call_count, 1)
self.assertEqual(ip_find.call_count, 0)
self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.100")
def test_post_update_port_fixed_ips_ip_id(self):
new_port_ip = dict(port=dict(fixed_ips=[dict(ip_id=1)]))
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, deallocated=True)
with self._stubs(port=port, addr=ip) as (port_find, alloc_ip, ip_find):
response = self.plugin.post_update_port(self.context, 1,
new_port_ip)
self.assertEqual(port_find.call_count, 1)
self.assertEqual(alloc_ip.call_count, 0)
self.assertEqual(ip_find.call_count, 1)
self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.100")
def test_post_update_port_fixed_ips_ip_address_exists(self):
new_port_ip = dict(port=dict(fixed_ips=[dict(
ip_address="192.168.1.100")]))
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, deallocated=True)
with self._stubs(port=port, addr=ip) as (port_find, alloc_ip, ip_find):
response = self.plugin.post_update_port(self.context, 1,
new_port_ip)
self.assertEqual(port_find.call_count, 1)
self.assertEqual(alloc_ip.call_count, 0)
self.assertEqual(ip_find.call_count, 1)
self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.100")
def test_post_update_port_fixed_ips_ip_address_doesnt_exist(self):
new_port_ip = dict(port=dict(fixed_ips=[dict(
ip_address="192.168.1.101")]))
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
ip = dict(id=1, address=3232235876, address_readable="192.168.1.101",
subnet_id=1, network_id=2, version=4, deallocated=True)
with self._stubs(port=port, addr=None, addr2=ip) as \
(port_find, alloc_ip, ip_find):
response = self.plugin.post_update_port(self.context, 1,
new_port_ip)
self.assertEqual(port_find.call_count, 1)
self.assertEqual(alloc_ip.call_count, 1)
self.assertEqual(ip_find.call_count, 1)
self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.101")
class TestQuarkGetPortCount(TestQuarkPlugin):
def test_get_port_count(self):
"""This isn't really testable."""
with mock.patch("quark.db.api.port_count_all"):
self.plugin.get_ports_count(self.context, {})
class TestQuarkDeletePort(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port=None, addr=None, mac=None):
port_models = None
if port:
port_model = models.Port()
port_model.update(port)
port_models = port_model
db_mod = "quark.db.api"
ipam = "quark.ipam.QuarkIpam"
with contextlib.nested(
mock.patch("%s.port_find" % db_mod),
mock.patch("%s.deallocate_ip_address" % ipam),
mock.patch("%s.deallocate_mac_address" % ipam),
mock.patch("%s.port_delete" % db_mod),
mock.patch("quark.drivers.base.BaseDriver.delete_port")
) as (port_find, dealloc_ip, dealloc_mac, db_port_del,
driver_port_del):
port_find.return_value = port_models
dealloc_ip.return_value = addr
dealloc_mac.return_value = mac
yield db_port_del, driver_port_del
def test_port_delete(self):
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2, mac_address="AA:BB:CC:DD:EE:FF",
backend_key="foo"))
with self._stubs(port=port["port"]) as (db_port_del, driver_port_del):
self.plugin.delete_port(self.context, 1)
self.assertTrue(db_port_del.called)
driver_port_del.assert_called_with(self.context, "foo")
def test_port_delete_port_not_found_fails(self):
with self._stubs(port=None) as (db_port_del, driver_port_del):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.delete_port(self.context, 1)
class TestQuarkDisassociatePort(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port=None):
port_models = None
if port:
port_model = models.Port()
port_model.update(port)
for ip in port["fixed_ips"]:
port_model.ip_addresses.append(models.IPAddress(
id=1,
address=ip["ip_address"],
subnet_id=ip["subnet_id"]))
port_models = port_model
db_mod = "quark.db.api"
with mock.patch("%s.port_find" % db_mod) as port_find:
port_find.return_value = port_models
yield port_find
def test_port_disassociate_port(self):
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
fixed_ips = [{"subnet_id": ip["subnet_id"],
"ip_address": ip["address_readable"]}]
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2, mac_address="AA:BB:CC:DD:EE:FF",
backend_key="foo", fixed_ips=fixed_ips))
with self._stubs(port=port["port"]) as (port_find):
new_port = self.plugin.disassociate_port(self.context, 1, 1)
port_find.assert_called_with(self.context,
id=1,
ip_address_id=[1],
scope=quark_db_api.ONE)
self.assertEqual(new_port["fixed_ips"], [])
def test_port_disassociate_port_not_found_fails(self):
with self._stubs(port=None):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.disassociate_port(self.context, 1, 1)
class TestQuarkGetRoutes(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, routes):
@@ -1676,65 +1009,3 @@ class TestQuarkDeleteRoutes(TestQuarkPlugin):
with self._stubs(route=None):
with self.assertRaises(quark_exceptions.RouteNotFound):
self.plugin.delete_route(self.context, 1)
class TestQuarkGetIpAddresses(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, ips, ports):
with mock.patch("quark.db.api.ip_address_find") as ip_find:
ip_models = []
port_models = []
for port in ports:
p = models.Port()
p.update(port)
port_models.append(p)
if isinstance(ips, list):
for ip in ips:
version = ip.pop("version")
ip_mod = models.IPAddress()
ip_mod.update(ip)
ip_mod.version = version
ip_mod.ports = port_models
ip_models.append(ip_mod)
ip_find.return_value = ip_models
else:
if ips:
version = ips.pop("version")
ip_mod = models.IPAddress()
ip_mod.update(ips)
ip_mod.version = version
ip_mod.ports = port_models
ip_find.return_value = ip_mod
else:
ip_find.return_value = ips
yield
def test_get_ip_addresses(self):
port = dict(id=100, device_id="foobar")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ips=[ip], ports=[port]):
res = self.plugin.get_ip_addresses(self.context)
addr_res = res[0]
self.assertEqual(ip["id"], addr_res["id"])
self.assertEqual(ip["subnet_id"], addr_res["subnet_id"])
self.assertEqual(ip["address_readable"], addr_res["address"])
self.assertEqual(addr_res["port_ids"][0], port["id"])
self.assertEqual(addr_res["device_ids"][0], port["device_id"])
def test_get_ip_address(self):
port = dict(id=100)
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ips=ip, ports=[port]):
res = self.plugin.get_ip_address(self.context, 1)
self.assertEqual(ip["id"], res["id"])
self.assertEqual(ip["subnet_id"], res["subnet_id"])
self.assertEqual(ip["address_readable"], res["address"])
self.assertEqual(res["port_ids"][0], port["id"])
def test_get_ip_address_no_ip_fails(self):
port = dict(id=100)
with self._stubs(ips=None, ports=[port]):
with self.assertRaises(quark_exceptions.IpAddressNotFound):
self.plugin.get_ip_address(self.context, 1)

27
quark/utils.py Normal file
View File

@@ -0,0 +1,27 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.v2 import attributes
def attr_specified(param):
return param is not attributes.ATTR_NOT_SPECIFIED
def pop_param(attrs, param, default=None):
val = attrs.pop(param, default)
if attr_specified(val):
return val
return default