- change IPPolicy to use CIDRs for exclusion
- no longer allow offset/lengths
- changed IPPolicyRule to IPPolicyCIDR
- default CIDRs excluded are network ip and broadcast ip, these are detected and added in the IPPolicy model.
- no longer allow configurable default policies becase we can't detect if subnet is IPv4 or IPv6 at configuration time
- We are more strict about only allowing a user to provide either network_ids or subnet_ids.  We will throw an exception if they provide both.
This commit is contained in:
jmeridth
2013-12-13 14:53:37 +00:00
parent 01dcbfbde3
commit 24bb7bed88
11 changed files with 278 additions and 182 deletions

View File

@@ -516,11 +516,10 @@ def security_group_rule_delete(context, rule):
def ip_policy_create(context, **ip_policy_dict): def ip_policy_create(context, **ip_policy_dict):
new_policy = models.IPPolicy() new_policy = models.IPPolicy()
ranges = ip_policy_dict.pop("exclude") exclude = ip_policy_dict.pop("exclude")
for arange in ranges: for excluded_cidr in exclude:
new_policy["exclude"].append(models.IPPolicyRange( new_policy["exclude"].append(
offset=arange["offset"], models.IPPolicyCIDR(cidr=excluded_cidr))
length=arange["length"]))
new_policy.update(ip_policy_dict) new_policy.update(ip_policy_dict)
new_policy["tenant_id"] = context.tenant_id new_policy["tenant_id"] = context.tenant_id
@@ -536,13 +535,12 @@ def ip_policy_find(context, **filters):
def ip_policy_update(context, ip_policy, **ip_policy_dict): def ip_policy_update(context, ip_policy, **ip_policy_dict):
ranges = ip_policy_dict.pop("exclude", []) exclude = ip_policy_dict.pop("exclude", [])
if ranges: if exclude:
ip_policy["exclude"] = [] ip_policy["exclude"] = []
for arange in ranges: for excluded_cidr in exclude:
ip_policy["exclude"].append(models.IPPolicyRange( ip_policy["exclude"].append(
offset=arange["offset"], models.IPPolicyCIDR(cidr=excluded_cidr))
length=arange["length"]))
ip_policy.update(ip_policy_dict) ip_policy.update(ip_policy_dict)
context.session.add(ip_policy) context.session.add(ip_policy)

View File

@@ -27,26 +27,13 @@ from neutron.db import models_v2 as models
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common import timeutils from neutron.openstack.common import timeutils
from oslo.config import cfg
from quark.db import custom_types from quark.db import custom_types
#NOTE(mdietz): This is the only way to actually create the quotas table, #NOTE(mdietz): This is the only way to actually create the quotas table,
# regardless if we need it. This is how it's done upstream. # regardless if we need it. This is how it's done upstream.
from quark import quota_driver # noqa
import json
HasId = models.HasId HasId = models.HasId
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF
quark_opts = [
cfg.StrOpt('default_ip_policy',
default='{"exclude": [{"offset": -1, "length": 3}]}',
help=_("Default IP allocation policy"))
]
CONF.register_opts(quark_opts, "QUARK")
def _default_list_getset(collection_class, proxy): def _default_list_getset(collection_class, proxy):
@@ -371,64 +358,44 @@ class IPPolicy(BASEV2, models.HasId, models.HasTenant):
primaryjoin="IPPolicy.id==Subnet.ip_policy_id", primaryjoin="IPPolicy.id==Subnet.ip_policy_id",
backref="ip_policy") backref="ip_policy")
exclude = orm.relationship( exclude = orm.relationship(
"IPPolicyRange", "IPPolicyCIDR",
primaryjoin="IPPolicy.id==IPPolicyRange.ip_policy_id", primaryjoin="IPPolicy.id==IPPolicyCIDR.ip_policy_id",
backref="ip_policy") backref="ip_policy")
name = sa.Column(sa.String(255), nullable=True) name = sa.Column(sa.String(255), nullable=True)
description = sa.Column(sa.String(255), nullable=True) description = sa.Column(sa.String(255), nullable=True)
class JSONIPPolicy(object):
def __init__(self, policy=None):
self.policy = {}
if not policy:
self._compile_policy(CONF.QUARK.default_ip_policy)
else:
self._compile_policy(policy)
def _compile_policy(self, policy):
self.policy = json.loads(policy)
def __getattr__(self, name):
return getattr(self.policy, name)
DEFAULT_POLICY = JSONIPPolicy()
@staticmethod @staticmethod
def get_ip_policy_rule_set(subnet): def get_ip_policy_cidrs(subnet):
ip_policy = subnet["ip_policy"] or \ ip_policy = subnet["ip_policy"] or \
subnet["network"]["ip_policy"] or \ subnet["network"]["ip_policy"] or \
dict() dict()
ip_policy_ranges = ip_policy.get("exclude", []) + \
IPPolicy.DEFAULT_POLICY.get("exclude", [])
ip_policy_rules = netaddr.IPSet() subnet_cidr = netaddr.IPNetwork(subnet["cidr"])
subnet_net = netaddr.IPNetwork(subnet["cidr"]) network_ip = subnet_cidr.network
broadcast_ip = subnet_cidr.broadcast
prefix_len = '32' if subnet_cidr.version == 4 else '128'
default_policy_cidrs = ["%s/%s" % (network_ip, prefix_len),
"%s/%s" % (broadcast_ip, prefix_len)]
ip_policy_cidrs = []
ip_policies = ip_policy.get("exclude", [])
if ip_policies:
ip_policy_cidrs = [ip_policy_cidr.cidr
for ip_policy_cidr in ip_policies]
def _policy_set(offset, length): ip_policy_cidrs = ip_policy_cidrs + default_policy_cidrs
start = subnet_net.first + offset
end = start + length
return netaddr.IPSet(netaddr.IPRange(start, end - 1))
for arange in ip_policy_ranges: ip_set = netaddr.IPSet()
offset, length = arange["offset"], arange["length"] for cidr in ip_policy_cidrs:
if offset < 0: ip_set.add(cidr)
if offset + length > 0:
ip_policy_rules |= _policy_set(0, offset + length)
pos_offset = subnet_net.size + offset
capped_length = min(length, -offset)
ip_policy_rules |= _policy_set(pos_offset, capped_length)
else:
ip_policy_rules |= _policy_set(offset, length)
return ip_policy_rules return ip_set
class IPPolicyRange(BASEV2, models.HasId): class IPPolicyCIDR(BASEV2, models.HasId):
__tablename__ = "quark_ip_policy_rules" __tablename__ = "quark_ip_policy_cidrs"
ip_policy_id = sa.Column(sa.String(36), sa.ForeignKey( ip_policy_id = sa.Column(sa.String(36), sa.ForeignKey(
"quark_ip_policy.id", ondelete="CASCADE")) "quark_ip_policy.id", ondelete="CASCADE"))
offset = sa.Column(sa.Integer()) cidr = sa.Column(sa.String(64))
length = sa.Column(sa.Integer())
class Network(BASEV2, models.HasId): class Network(BASEV2, models.HasId):

View File

@@ -139,7 +139,7 @@ class QuarkIpam(object):
return ip_addresses return ip_addresses
def _iterate_until_available_ip(self, context, subnet, network_id, def _iterate_until_available_ip(self, context, subnet, network_id,
ip_policy_rules): ip_policy_cidrs):
address = True address = True
while address: while address:
next_ip_int = int(subnet["next_auto_assign_ip"]) next_ip_int = int(subnet["next_auto_assign_ip"])
@@ -147,7 +147,7 @@ class QuarkIpam(object):
if subnet["ip_version"] == 4: if subnet["ip_version"] == 4:
next_ip = next_ip.ipv4() next_ip = next_ip.ipv4()
subnet["next_auto_assign_ip"] = next_ip_int + 1 subnet["next_auto_assign_ip"] = next_ip_int + 1
if ip_policy_rules and next_ip in ip_policy_rules: if ip_policy_cidrs and next_ip in ip_policy_cidrs:
continue continue
address = db_api.ip_address_find( address = db_api.ip_address_find(
context, network_id=network_id, ip_address=next_ip, context, network_id=network_id, ip_address=next_ip,
@@ -164,8 +164,7 @@ class QuarkIpam(object):
ip_address=None): ip_address=None):
new_addresses = [] new_addresses = []
for subnet in subnets: for subnet in subnets:
ip_policy_rules = models.IPPolicy.get_ip_policy_rule_set( ip_policy_cidrs = models.IPPolicy.get_ip_policy_cidrs(subnet)
subnet)
# Creating this IP for the first time # Creating this IP for the first time
next_ip = None next_ip = None
if ip_address: if ip_address:
@@ -178,7 +177,7 @@ class QuarkIpam(object):
net_id=net_id) net_id=net_id)
else: else:
next_ip = self._iterate_until_available_ip( next_ip = self._iterate_until_available_ip(
context, subnet, net_id, ip_policy_rules) context, subnet, net_id, ip_policy_cidrs)
context.session.add(subnet) context.session.add(subnet)
address = db_api.ip_address_create( address = db_api.ip_address_create(
@@ -289,11 +288,10 @@ class QuarkIpam(object):
ipnet = netaddr.IPNetwork(subnet["cidr"]) ipnet = netaddr.IPNetwork(subnet["cidr"])
if ip_address and ip_address not in ipnet: if ip_address and ip_address not in ipnet:
continue continue
ip_policy_rules = None ip_policy_cidrs = None
if not ip_address: if not ip_address:
ip_policy_rules = models.IPPolicy.get_ip_policy_rule_set( ip_policy_cidrs = models.IPPolicy.get_ip_policy_cidrs(subnet)
subnet) policy_size = ip_policy_cidrs.size if ip_policy_cidrs else 0
policy_size = ip_policy_rules.size if ip_policy_rules else 0
if ipnet.size > (ips_in_subnet + policy_size): if ipnet.size > (ips_in_subnet + policy_size):
return subnet return subnet

View File

@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import netaddr
from neutron.common import exceptions from neutron.common import exceptions
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from oslo.config import cfg from oslo.config import cfg
@@ -32,11 +33,17 @@ def create_ip_policy(context, ip_policy):
if not ipp.get("exclude"): if not ipp.get("exclude"):
raise exceptions.BadRequest(resource="ip_policy", raise exceptions.BadRequest(resource="ip_policy",
msg="Empty ip_policy.exclude regions") msg="Empty ip_policy.exclude")
ip_policy_cidrs = ipp.get("exclude", [])
network_ids = ipp.get("network_ids") network_ids = ipp.get("network_ids")
subnet_ids = ipp.get("subnet_ids") subnet_ids = ipp.get("subnet_ids")
if subnet_ids and network_ids:
raise exceptions.BadRequest(
resource="ip_policy",
msg="network_ids and subnet_ids specified. only one allowed")
if not subnet_ids and not network_ids: if not subnet_ids and not network_ids:
raise exceptions.BadRequest( raise exceptions.BadRequest(
resource="ip_policy", resource="ip_policy",
@@ -49,6 +56,8 @@ def create_ip_policy(context, ip_policy):
context, id=subnet_ids, scope=db_api.ALL) context, id=subnet_ids, scope=db_api.ALL)
if not subnets: if not subnets:
raise exceptions.SubnetNotFound(id=subnet_ids) raise exceptions.SubnetNotFound(id=subnet_ids)
if ip_policy_cidrs:
_validate_cidrs_fit_into_subnets(ip_policy_cidrs, subnets)
models.extend(subnets) models.extend(subnets)
if network_ids: if network_ids:
@@ -56,6 +65,10 @@ def create_ip_policy(context, ip_policy):
context, id=network_ids, scope=db_api.ALL) context, id=network_ids, scope=db_api.ALL)
if not nets: if not nets:
raise exceptions.NetworkNotFound(net_id=network_ids) raise exceptions.NetworkNotFound(net_id=network_ids)
subnets = [subnet for net in nets
for subnet in net.get("subnets", [])]
if ip_policy_cidrs and subnets:
_validate_cidrs_fit_into_subnets(ip_policy_cidrs, subnets)
models.extend(nets) models.extend(nets)
for model in models: for model in models:
@@ -91,9 +104,15 @@ def update_ip_policy(context, id, ip_policy):
if not ipp_db: if not ipp_db:
raise quark_exceptions.IPPolicyNotFound(id=id) raise quark_exceptions.IPPolicyNotFound(id=id)
ip_policy_cidrs = ipp.get("exclude", [])
network_ids = ipp.get("network_ids") network_ids = ipp.get("network_ids")
subnet_ids = ipp.get("subnet_ids") subnet_ids = ipp.get("subnet_ids")
if subnet_ids and network_ids:
raise exceptions.BadRequest(
resource="ip_policy",
msg="network_ids and subnet_ids specified. only one allowed")
models = [] models = []
if subnet_ids: if subnet_ids:
for subnet in ipp_db["subnets"]: for subnet in ipp_db["subnets"]:
@@ -102,6 +121,8 @@ def update_ip_policy(context, id, ip_policy):
context, id=subnet_ids, scope=db_api.ALL) context, id=subnet_ids, scope=db_api.ALL)
if len(subnets) != len(subnet_ids): if len(subnets) != len(subnet_ids):
raise exceptions.SubnetNotFound(id=subnet_ids) raise exceptions.SubnetNotFound(id=subnet_ids)
if ip_policy_cidrs:
_validate_cidrs_fit_into_subnets(ip_policy_cidrs, subnets)
models.extend(subnets) models.extend(subnets)
if network_ids: if network_ids:
@@ -111,6 +132,10 @@ def update_ip_policy(context, id, ip_policy):
scope=db_api.ALL) scope=db_api.ALL)
if len(nets) != len(network_ids): if len(nets) != len(network_ids):
raise exceptions.NetworkNotFound(net_id=network_ids) raise exceptions.NetworkNotFound(net_id=network_ids)
subnets = [subnet for net in nets
for subnet in net.get("subnets", [])]
if ip_policy_cidrs and subnets:
_validate_cidrs_fit_into_subnets(ip_policy_cidrs, subnets)
models.extend(nets) models.extend(nets)
for model in models: for model in models:
@@ -132,3 +157,17 @@ def delete_ip_policy(context, id):
if ipp["networks"] or ipp["subnets"]: if ipp["networks"] or ipp["subnets"]:
raise quark_exceptions.IPPolicyInUse(id=id) raise quark_exceptions.IPPolicyInUse(id=id)
db_api.ip_policy_delete(context, ipp) db_api.ip_policy_delete(context, ipp)
def _validate_cidrs_fit_into_subnets(cidrs, subnets):
LOG.info("validate_cidrs_all_fit_into_subnets with CIDRs (%s) "
"and subnets (%s)" % (cidrs, subnets))
for cidr in cidrs:
cidr = netaddr.IPNetwork(cidr)
for subnet in subnets:
subnet_cidr = netaddr.IPNetwork(subnet["cidr"])
if cidr.version == subnet_cidr.version and cidr not in subnet_cidr:
raise exceptions.BadRequest(
resource="ip_policy",
msg="CIDR %s not in subnet CIDR %s"
% (cidr, subnet_cidr))

View File

@@ -125,18 +125,12 @@ def create_subnet(context, subnet):
context, ip=netaddr.IPAddress(dns_ip))) context, ip=netaddr.IPAddress(dns_ip)))
if isinstance(allocation_pools, list): if isinstance(allocation_pools, list):
ranges = []
cidrset = netaddr.IPSet([netaddr.IPNetwork(new_subnet["cidr"])]) cidrset = netaddr.IPSet([netaddr.IPNetwork(new_subnet["cidr"])])
for p in allocation_pools: for p in allocation_pools:
cidrset -= netaddr.IPSet(netaddr.IPRange(p["start"], p["end"])) cidrset -= netaddr.IPSet(netaddr.IPRange(p["start"], p["end"]))
non_allocation_pools = v._pools_from_cidr(cidrset) cidrs = [str(x.cidr) for x in cidrset.iter_cidrs()]
for p in non_allocation_pools:
r = netaddr.IPRange(p["start"], p["end"])
ranges.append(dict(
length=len(r),
offset=int(r[0]) - int(cidr[0])))
new_subnet["ip_policy"] = db_api.ip_policy_create(context, new_subnet["ip_policy"] = db_api.ip_policy_create(context,
exclude=ranges) exclude=cidrs)
subnet_dict = v._make_subnet_dict(new_subnet, subnet_dict = v._make_subnet_dict(new_subnet,
default_route=routes.DEFAULT_ROUTE) default_route=routes.DEFAULT_ROUTE)

View File

@@ -75,9 +75,9 @@ def _make_subnet_dict(subnet, default_route=None, fields=None):
net_id = STRATEGY.get_parent_network(subnet["network_id"]) net_id = STRATEGY.get_parent_network(subnet["network_id"])
def _allocation_pools(subnet): def _allocation_pools(subnet):
ip_policy_rules = models.IPPolicy.get_ip_policy_rule_set(subnet) ip_policy_cidrs = models.IPPolicy.get_ip_policy_cidrs(subnet)
cidr = netaddr.IPSet([netaddr.IPNetwork(subnet["cidr"])]) cidr = netaddr.IPSet([netaddr.IPNetwork(subnet["cidr"])])
allocatable = cidr - ip_policy_rules allocatable = cidr - ip_policy_cidrs
return _pools_from_cidr(allocatable) return _pools_from_cidr(allocatable)
res = {"id": subnet.get("id"), res = {"id": subnet.get("id"),
@@ -212,14 +212,12 @@ def _make_ip_dict(address):
def _make_ip_policy_dict(ipp): def _make_ip_policy_dict(ipp):
excludes = [dict(offset=range["offset"], length=range["length"])
for range in ipp["exclude"]]
return {"id": ipp["id"], return {"id": ipp["id"],
"tenant_id": ipp["tenant_id"], "tenant_id": ipp["tenant_id"],
"name": ipp["name"], "name": ipp["name"],
"subnet_ids": [s["id"] for s in ipp["subnets"]], "subnet_ids": [s["id"] for s in ipp["subnets"]],
"network_ids": [n["id"] for n in ipp["networks"]], "network_ids": [n["id"] for n in ipp["networks"]],
"exclude": excludes} "exclude": ipp["exclude"]}
def make_security_group_list(context, group_ids): def make_security_group_list(context, group_ids):

View File

@@ -61,4 +61,4 @@ class QuarkIPAddressAllocate(QuarkIpamBaseFunctionalTest):
ipaddress = self.ipam.allocate_ip_address(self.context, net["id"], ipaddress = self.ipam.allocate_ip_address(self.context, net["id"],
0, 0) 0, 0)
self.assertIsNotNone(ipaddress[0]['id']) self.assertIsNotNone(ipaddress[0]['id'])
self.assertEqual(ipaddress[0]['address'], 2) self.assertEqual(ipaddress[0]['address'], 1)

View File

@@ -14,12 +14,11 @@
# under the License. # under the License.
import contextlib import contextlib
import mock import mock
import netaddr
from neutron.common import exceptions from neutron.common import exceptions
from quark import exceptions as quark_exceptions from quark import exceptions as quark_exceptions
from quark.plugin_modules import ip_policies as ippol
from quark.tests import test_quark_plugin from quark.tests import test_quark_plugin
@@ -43,7 +42,7 @@ class TestQuarkGetIpPolicies(test_quark_plugin.TestQuarkPlugin):
name="foo", name="foo",
subnets=[dict(id=1)], subnets=[dict(id=1)],
networks=[dict(id=2)], networks=[dict(id=2)],
exclude=[dict(offset=1, length=256)]) exclude=["0.0.0.0/32"])
with self._stubs(ip_policy): with self._stubs(ip_policy):
resp = self.plugin.get_ip_policy(self.context, 1) resp = self.plugin.get_ip_policy(self.context, 1)
self.assertEqual(len(resp.keys()), 6) self.assertEqual(len(resp.keys()), 6)
@@ -61,7 +60,7 @@ class TestQuarkGetIpPolicies(test_quark_plugin.TestQuarkPlugin):
name="foo", name="foo",
subnets=[dict(id=1)], subnets=[dict(id=1)],
networks=[dict(id=2)], networks=[dict(id=2)],
exclude=[dict(offset=1, length=256)]) exclude=["0.0.0.0/32"])
with self._stubs([ip_policy]): with self._stubs([ip_policy]):
resp = self.plugin.get_ip_policies(self.context) resp = self.plugin.get_ip_policies(self.context)
self.assertEqual(len(resp), 1) self.assertEqual(len(resp), 1)
@@ -95,6 +94,12 @@ class TestQuarkCreateIpPolicies(test_quark_plugin.TestQuarkPlugin):
self.plugin.create_ip_policy(self.context, dict( self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict())) ip_policy=dict()))
def test_create_ip_policy_with_both_network_and_subnet_ids(self):
with self._stubs(None):
with self.assertRaises(exceptions.BadRequest):
self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict(network_ids=[1], subnet_ids=[1])))
def test_create_ip_policy_invalid_body_missing_netsubnet(self): def test_create_ip_policy_invalid_body_missing_netsubnet(self):
with self._stubs(None): with self._stubs(None):
with self.assertRaises(exceptions.BadRequest): with self.assertRaises(exceptions.BadRequest):
@@ -116,64 +121,95 @@ class TestQuarkCreateIpPolicies(test_quark_plugin.TestQuarkPlugin):
exclude=["1.1.1.1/24"]))) exclude=["1.1.1.1/24"])))
def test_create_ip_policy_network_ip_policy_already_exists(self): def test_create_ip_policy_network_ip_policy_already_exists(self):
with self._stubs(None, net=dict(id=1, ip_policy=dict(id=2))): with self._stubs(None, net=dict(id=1, ip_policy=dict(id=2),
subnets=[dict(id=1,
cidr="1.1.1.1/16")])):
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists): with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
self.plugin.create_ip_policy(self.context, dict( self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict(network_ids=[1], ip_policy=dict(network_ids=[1],
exclude=["1.1.1.1/24"]))) exclude=["1.1.1.1/24"])))
def test_create_ip_policy_subnet_ip_policy_already_exists(self): def test_create_ip_policy_subnet_ip_policy_already_exists(self):
with self._stubs(None, subnet=dict(id=1, ip_policy=dict(id=2))): with self._stubs(None, subnet=dict(id=1, ip_policy=dict(id=2),
cidr="1.1.1.1/16")):
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists): with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
self.plugin.create_ip_policy(self.context, dict( self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict(subnet_ids=[1], ip_policy=dict(subnet_ids=[1],
exclude=["1.1.1.1/24"]))) exclude=["1.1.1.1/24"])))
def test_create_ip_policy_network(self): def test_create_ip_policy_network(self):
ipp = dict(subnet_id=None, network_id=1, ipp = dict(subnet_id=None, network_id=1, exclude=["1.1.1.1/24"])
exclude=[dict(address=int(netaddr.IPAddress("1.1.1.1")), with self._stubs(ipp, net=dict(id=1, ip_policy=dict(id=2),
prefix=24)]) subnets=[dict(id=1,
with self._stubs(ipp, net=dict(id=1, ip_policy=dict(id=2))): cidr="1.1.1.1/16")])):
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists): with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
resp = self.plugin.create_ip_policy(self.context, dict( self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict(network_ids=[1], ip_policy=dict(network_ids=[ipp["network_id"]],
exclude=["1.1.1.1/24"]))) exclude=ipp["exclude"])))
self.assertEqual(len(resp.keys()), 3)
self.assertIsNone(resp["subnet_ids"])
self.assertEqual(resp["network_ids"], 1)
self.assertEqual(resp["exclude"], [dict()])
def test_create_ip_policy_subnet(self): def test_create_ip_policy_subnet(self):
ipp = dict(subnet_id=1, network_id=None, ipp = dict(subnet_id=1, network_id=None, exclude=["1.1.1.1/24"])
exclude=[dict(address=int(netaddr.IPAddress("1.1.1.1")), with self._stubs(ipp, subnet=dict(id=1, ip_policy=dict(id=2),
prefix=24)]) cidr="1.1.1.1/16")):
with self._stubs(ipp, subnet=dict(id=1, ip_policy=dict(id=2))):
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists): with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
resp = self.plugin.create_ip_policy(self.context, dict( self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict(subnet_ids=[1], ip_policy=dict(subnet_ids=[ipp["subnet_id"]],
exclude=["1.1.1.1/24"]))) exclude=ipp["exclude"])))
self.assertEqual(len(resp.keys()), 3)
self.assertEqual(resp["subnet_id"], 1)
self.assertIsNone(resp["network_id"])
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
def test_create_ip_policy(self): def test_create_ip_policy_with_cidr_that_does_not_fit_into_subnet(self):
ipp = dict( ipp = dict(
subnets=[dict(id=1)], subnets=[dict(id=1, version=4, cidr="192.168.1.1/24")],
networks=[], networks=[],
id=1, id=1,
tenant_id=1, tenant_id=1,
exclude=[dict(offset=0, length=256)], exclude=["10.10.10.100/32"],
name="foo") name="foo")
with self._stubs(ipp, subnet=dict(id=1, ip_policy=None)): with self._stubs(ipp, subnet=dict(id=1, ip_policy=None,
version=ipp["subnets"][0]["version"],
cidr=ipp["subnets"][0]["cidr"])):
with self.assertRaises(exceptions.BadRequest):
self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict(subnet_ids=[1],
exclude=ipp["exclude"])))
def test_create_ip_policy_with_ipv6_subnet_cidr(self):
ipp = dict(
subnets=[dict(id=1, version=6, cidr='::/64')],
networks=[],
id=1,
tenant_id=1,
exclude=["::/128"],
name="foo")
with self._stubs(ipp, subnet=dict(id=1, ip_policy=None,
version=ipp["subnets"][0]["version"],
cidr=ipp["subnets"][0]["cidr"])):
resp = self.plugin.create_ip_policy(self.context, dict( resp = self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict(subnet_ids=[1], ip_policy=dict(subnet_ids=[1],
exclude=[dict(offset=0, length=256)]))) exclude=ipp["exclude"])))
self.assertEqual(len(resp.keys()), 6) self.assertEqual(len(resp.keys()), 6)
self.assertEqual(resp["subnet_ids"], [1]) self.assertEqual(resp["subnet_ids"], [1])
self.assertEqual(resp["network_ids"], []) self.assertEqual(resp["network_ids"], [])
self.assertEqual(resp["exclude"], self.assertEqual(resp["exclude"], ["::/128"])
[dict(offset=0, length=256)]) self.assertEqual(resp["name"], "foo")
self.assertEqual(resp["tenant_id"], 1)
def test_create_ip_policy(self):
ipp = dict(
subnets=[dict(id=1, cidr='0.0.0.0/16')],
networks=[],
id=1,
tenant_id=1,
exclude=["0.0.0.0/24"],
name="foo")
with self._stubs(ipp, subnet=dict(id=1, ip_policy=None,
cidr=ipp["subnets"][0]["cidr"])):
resp = self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict(subnet_ids=[1],
exclude=ipp["exclude"])))
self.assertEqual(len(resp.keys()), 6)
self.assertEqual(resp["subnet_ids"], [1])
self.assertEqual(resp["network_ids"], [])
self.assertEqual(resp["exclude"], ["0.0.0.0/24"])
self.assertEqual(resp["name"], "foo") self.assertEqual(resp["name"], "foo")
self.assertEqual(resp["tenant_id"], 1) self.assertEqual(resp["tenant_id"], 1)
@@ -198,37 +234,41 @@ class TestQuarkUpdateIpPolicies(test_quark_plugin.TestQuarkPlugin):
yield ip_policy_update yield ip_policy_update
def test_update_ip_policy_not_found(self): def test_update_ip_policy_not_found(self):
with self._stubs(None) as (ip_policy_update): with self._stubs(None):
with self.assertRaises(quark_exceptions.IPPolicyNotFound): with self.assertRaises(quark_exceptions.IPPolicyNotFound):
self.plugin.update_ip_policy(self.context, 1, self.plugin.update_ip_policy(self.context, 1,
dict(ip_policy=None)) dict(ip_policy=None))
self.assertEqual(ip_policy_update.called, 0)
def test_update_ip_policy_with_both_network_and_subnet_ids(self):
ipp = dict(id=1, subnets=[])
with self._stubs(ipp):
with self.assertRaises(exceptions.BadRequest):
self.plugin.update_ip_policy(self.context, 1, dict(
ip_policy=dict(network_ids=[1], subnet_ids=[1])))
def test_update_ip_policy_subnets_not_found(self): def test_update_ip_policy_subnets_not_found(self):
ipp = dict(id=1, subnets=[]) ipp = dict(id=1, subnets=[])
with self._stubs(ipp) as (ip_policy_update): with self._stubs(ipp):
with self.assertRaises(exceptions.SubnetNotFound): with self.assertRaises(exceptions.SubnetNotFound):
self.plugin.update_ip_policy( self.plugin.update_ip_policy(
self.context, self.context,
1, 1,
dict(ip_policy=dict(subnet_ids=[100]))) dict(ip_policy=dict(subnet_ids=[100])))
self.assertEqual(ip_policy_update.called, 0)
def test_update_ip_policy_subnets_already_exists(self): def test_update_ip_policy_subnets_already_exists(self):
ipp = dict(id=1, subnets=[dict()]) ipp = dict(id=1, subnets=[dict()])
with self._stubs( with self._stubs(
ipp, subnets=[dict(id=1, ip_policy=dict(id=1))] ipp, subnets=[dict(id=1, ip_policy=dict(id=1))]
) as (ip_policy_update): ):
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists): with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
self.plugin.update_ip_policy( self.plugin.update_ip_policy(
self.context, self.context,
1, 1,
dict(ip_policy=dict(subnet_ids=[100]))) dict(ip_policy=dict(subnet_ids=[100])))
self.assertEqual(ip_policy_update.called, 0)
def test_update_ip_policy_subnets(self): def test_update_ip_policy_subnets(self):
ipp = dict(id=1, subnets=[dict()], ipp = dict(id=1, subnets=[dict()],
exclude=[dict(offset=0, length=256)], exclude=["0.0.0.0/24"],
name="foo", tenant_id=1) name="foo", tenant_id=1)
with self._stubs( with self._stubs(
ipp, subnets=[dict(id=1, ip_policy=None)] ipp, subnets=[dict(id=1, ip_policy=None)]
@@ -247,11 +287,12 @@ class TestQuarkUpdateIpPolicies(test_quark_plugin.TestQuarkPlugin):
self.context, self.context,
1, 1,
dict(ip_policy=dict(network_ids=[100]))) dict(ip_policy=dict(network_ids=[100])))
self.fai("HI")
self.assertEqual(ip_policy_update.called, 0) self.assertEqual(ip_policy_update.called, 0)
def test_update_ip_policy_networks(self): def test_update_ip_policy_networks(self):
ipp = dict(id=1, networks=[dict()], ipp = dict(id=1, networks=[dict()],
exclude=[dict(offset=0, length=256)], exclude=["0.0.0.0/24"],
name="foo", tenant_id=1) name="foo", tenant_id=1)
with self._stubs( with self._stubs(
ipp, networks=[dict(id=1, ip_policy=None)] ipp, networks=[dict(id=1, ip_policy=None)]
@@ -293,3 +334,75 @@ class TestQuarkDeleteIpPolicies(test_quark_plugin.TestQuarkPlugin):
self.plugin.delete_ip_policy(self.context, 1) self.plugin.delete_ip_policy(self.context, 1)
self.assertEqual(ip_policy_find.call_count, 1) self.assertEqual(ip_policy_find.call_count, 1)
self.assertEqual(ip_policy_delete.call_count, 1) self.assertEqual(ip_policy_delete.call_count, 1)
class TestQuarkValidateCIDRsFitsIntoSubnets(test_quark_plugin.TestQuarkPlugin):
def test_normal_cidr_and_valid_subnet(self):
try:
ippol._validate_cidrs_fit_into_subnets(
["192.168.0.100/32"], [dict(id=1, cidr="192.168.0.0/24")])
except Exception:
self.fail("Should not have failed")
def test_normal_ipv4_cidr_and_valid_ipv6_subnet(self):
try:
ippol._validate_cidrs_fit_into_subnets(
["192.168.0.100/32"], [dict(id=1, cidr="::/96")])
except Exception:
self.fail("Should not have failed")
def test_normal_ipv6_cidr_and_valid_ipv6_subnet(self):
try:
ippol._validate_cidrs_fit_into_subnets(
["::/128"], [dict(id=1, cidr="::/96")])
except Exception:
self.fail("Should not have failed")
def test_normal_ipv6_cidr_and_valid_ipv4_subnet(self):
try:
ippol._validate_cidrs_fit_into_subnets(
["::/128"], [dict(id=1, cidr="192.168.0.0/24")])
except Exception:
self.fail("Should not have failed")
def test_normal_cidr_and_multiple_valid_subnet(self):
try:
ippol._validate_cidrs_fit_into_subnets(
["192.168.0.100/32"],
[dict(id=1, cidr="192.168.0.0/24"),
dict(id=2, cidr="192.168.0.0/16")])
except Exception:
self.fail("Should not have failed")
def test_normal_ipv6_cidr_and_multiple_valid_ipv6_subnet(self):
try:
ippol._validate_cidrs_fit_into_subnets(
["::/128"],
[dict(id=1, cidr="::/96"),
dict(id=2, cidr="::/64")])
except Exception:
self.fail("Should not have failed")
def test_normal_cidr_and_invalid_subnet(self):
with self.assertRaises(exceptions.BadRequest):
ippol._validate_cidrs_fit_into_subnets(
["192.168.0.100/32"], [dict(id=1, cidr="10.10.10.0/24")])
def test_normal_ipv6_cidr_and_invalid_ipv6_subnet(self):
with self.assertRaises(exceptions.BadRequest):
ippol._validate_cidrs_fit_into_subnets(
["::/64"], [dict(id=1, cidr="::/96")])
def test_normal_cidr_and_one_invalid_and_one_valid_subnet(self):
with self.assertRaises(exceptions.BadRequest):
ippol._validate_cidrs_fit_into_subnets(
["192.168.0.100/32"],
[dict(id=1, cidr="10.10.10.0/24"),
dict(id=1, cidr="192.168.0.0/24")])
def test_normal_ipv6_cidr_and_one_invalid_and_one_valid_ipv6_subnet(self):
with self.assertRaises(exceptions.BadRequest):
ippol._validate_cidrs_fit_into_subnets(
["::/127"],
[dict(id=1, cidr="::/96"),
dict(id=1, cidr="::/128")])

View File

@@ -202,7 +202,7 @@ class TestQuarkCreateSubnetAllocationPools(test_quark_plugin.TestQuarkPlugin):
resp = self.plugin.create_subnet(self.context, s) resp = self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1) self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(resp["allocation_pools"], self.assertEqual(resp["allocation_pools"],
[dict(start="192.168.1.2", end="192.168.1.254")]) [dict(start="192.168.1.1", end="192.168.1.254")])
def test_create_subnet_allocation_pools_one(self): def test_create_subnet_allocation_pools_one(self):
pools = [dict(start="192.168.1.10", end="192.168.1.20")] pools = [dict(start="192.168.1.10", end="192.168.1.20")]

View File

@@ -22,22 +22,22 @@ class TestDBModels(test_base.TestBase):
def setUp(self): def setUp(self):
super(TestDBModels, self).setUp() super(TestDBModels, self).setUp()
def test_get_ip_policy_rule_set(self): def test_get_ip_policy_cidrs(self):
subnet = dict(id=1, ip_version=4, next_auto_assign_ip=0, subnet = dict(id=1, ip_version=4, next_auto_assign_ip=0,
cidr="0.0.0.0/24", first_ip=0, last_ip=255, cidr="0.0.0.0/24", first_ip=0, last_ip=255,
network=dict(ip_policy=None), ip_policy=None) network=dict(ip_policy=None), ip_policy=None)
ip_policy_rules = models.IPPolicy.get_ip_policy_rule_set(subnet) ip_policy_rules = models.IPPolicy.get_ip_policy_cidrs(subnet)
self.assertEqual(ip_policy_rules, self.assertEqual(ip_policy_rules,
IPSet(['0.0.0.0/31', '0.0.0.255/32'])) IPSet(['0.0.0.0/32', '0.0.0.255/32']))
def test_get_ip_policy_rule_set_v6(self): def test_get_ip_policy_cidrs_v6(self):
subnet = dict(id=1, ip_version=6, next_auto_assign_ip=0, subnet = dict(id=1, ip_version=6, next_auto_assign_ip=0,
cidr="fc00::/7", cidr="fc00::/7",
first_ip=334965454937798799971759379190646833152L, first_ip=334965454937798799971759379190646833152L,
last_ip=337623910929368631717566993311207522303L, last_ip=337623910929368631717566993311207522303L,
network=dict(ip_policy=None), ip_policy=None) network=dict(ip_policy=None), ip_policy=None)
ip_policy_rules = models.IPPolicy.get_ip_policy_rule_set(subnet) ip_policy_rules = models.IPPolicy.get_ip_policy_cidrs(subnet)
self.assertEqual( self.assertEqual(
ip_policy_rules, ip_policy_rules,
IPSet(["fc00::/127", IPSet(["fc00::/128",
"fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128"])) "fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128"]))

View File

@@ -305,7 +305,7 @@ class QuarkIpamTestBothIpAllocation(QuarkIpamBaseTest):
with self._stubs(subnets=[[(subnet4, 0)], [(subnet6, 0)]], with self._stubs(subnets=[[(subnet4, 0)], [(subnet6, 0)]],
addresses=[None, None, None, None]): addresses=[None, None, None, None]):
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0) address = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
self.assertEqual(address[0]["address"], 2) self.assertEqual(address[0]["address"], 1)
self.assertEqual(address[1]["address"], 0) self.assertEqual(address[1]["address"], 0)
def test_allocate_new_ip_address_one_v4_subnet_open(self): def test_allocate_new_ip_address_one_v4_subnet_open(self):
@@ -432,7 +432,7 @@ class QuarkIpamTestBothIpAllocation(QuarkIpamBaseTest):
self.assertEqual(len(address), 2) self.assertEqual(len(address), 2)
self.assertEqual(address[0]["address"], 4) self.assertEqual(address[0]["address"], 4)
self.assertEqual(address[0]["version"], 6) self.assertEqual(address[0]["version"], 6)
self.assertEqual(address[1]["address"], 2) self.assertEqual(address[1]["address"], 1)
self.assertEqual(address[1]["version"], 4) self.assertEqual(address[1]["version"], 4)
def test_reallocate_deallocated_v6_ip_as_string_address(self): def test_reallocate_deallocated_v6_ip_as_string_address(self):
@@ -450,7 +450,7 @@ class QuarkIpamTestBothIpAllocation(QuarkIpamBaseTest):
self.assertEqual(len(address), 2) self.assertEqual(len(address), 2)
self.assertEqual(address[0]["address"], "4") self.assertEqual(address[0]["address"], "4")
self.assertEqual(address[0]["version"], 6) self.assertEqual(address[0]["version"], 6)
self.assertEqual(address[1]["address"], 2) self.assertEqual(address[1]["address"], 1)
self.assertEqual(address[1]["version"], 4) self.assertEqual(address[1]["version"], 4)
def test_reallocate_deallocated_v4_v6(self): def test_reallocate_deallocated_v4_v6(self):
@@ -505,7 +505,7 @@ class QuarkIpamTestBothRequiredIpAllocation(QuarkIpamBaseTest):
with self._stubs(subnets=[[(subnet4, 0)], [(subnet6, 0)]], with self._stubs(subnets=[[(subnet4, 0)], [(subnet6, 0)]],
addresses=[None, None, None, None]): addresses=[None, None, None, None]):
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0) address = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
self.assertEqual(address[0]["address"], 2) self.assertEqual(address[0]["address"], 1)
self.assertEqual(address[1]["address"], 0) self.assertEqual(address[1]["address"], 0)
def test_allocate_new_ip_address_one_v4_subnet_open(self): def test_allocate_new_ip_address_one_v4_subnet_open(self):
@@ -567,7 +567,7 @@ class QuarkIpamTestBothRequiredIpAllocation(QuarkIpamBaseTest):
self.assertEqual(len(address), 2) self.assertEqual(len(address), 2)
self.assertEqual(address[0]["address"], 4) self.assertEqual(address[0]["address"], 4)
self.assertEqual(address[0]["version"], 6) self.assertEqual(address[0]["version"], 6)
self.assertEqual(address[1]["address"], 2) self.assertEqual(address[1]["address"], 1)
self.assertEqual(address[1]["version"], 4) self.assertEqual(address[1]["version"], 4)
def test_reallocate_deallocated_v4_v6(self): def test_reallocate_deallocated_v4_v6(self):
@@ -656,7 +656,7 @@ class QuarkNewIPAddressAllocation(QuarkIpamBaseTest):
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]): with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0, address = self.ipam.allocate_ip_address(self.context, 0, 0, 0,
version=4) version=4)
self.assertEqual(address[0]["address"], 2) # 0 => 2 self.assertEqual(address[0]["address"], 1) # 0 => 2
def test_allocate_new_ip_in_partially_allocated_range(self): def test_allocate_new_ip_in_partially_allocated_range(self):
addr = dict(id=1, address=3) addr = dict(id=1, address=3)
@@ -680,7 +680,7 @@ class QuarkNewIPAddressAllocation(QuarkIpamBaseTest):
subnets = [(subnet1, 1), (subnet2, 0)] subnets = [(subnet1, 1), (subnet2, 0)]
with self._stubs(subnets=subnets, addresses=[None, None]): with self._stubs(subnets=subnets, addresses=[None, None]):
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0) address = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
self.assertEqual(address[0]["address"], 258) # 256 => 258 self.assertEqual(address[0]["address"], 257)
self.assertEqual(address[0]["subnet_id"], 2) self.assertEqual(address[0]["subnet_id"], 2)
def test_allocate_ip_no_subnet_fails(self): def test_allocate_ip_no_subnet_fails(self):
@@ -708,7 +708,7 @@ class QuarkNewIPAddressAllocation(QuarkIpamBaseTest):
subnets = [(subnet1, 1), (subnet2, 1)] subnets = [(subnet1, 1), (subnet2, 1)]
with self._stubs(subnets=subnets, addresses=[None, None]): with self._stubs(subnets=subnets, addresses=[None, None]):
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0) address = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
self.assertEqual(address[0]["address"], 2) # 0 => 2 self.assertEqual(address[0]["address"], 1) # 0 => 2
self.assertEqual(address[0]["subnet_id"], 1) self.assertEqual(address[0]["subnet_id"], 1)
def test_find_requested_ip_subnet(self): def test_find_requested_ip_subnet(self):
@@ -818,7 +818,7 @@ class QuarkIPAddressAllocateDeallocated(QuarkIpamBaseTest):
False, subnet_mod, address0, addresses_found False, subnet_mod, address0, addresses_found
) as (choose_subnet): ) as (choose_subnet):
ipaddress = self.ipam.allocate_ip_address(self.context, 0, 0, 0) ipaddress = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
self.assertEqual(ipaddress[0]["address"], 2) self.assertEqual(ipaddress[0]["address"], 1)
self.assertIsNotNone(ipaddress[0]['id']) self.assertIsNotNone(ipaddress[0]['id'])
self.assertTrue(choose_subnet.called) self.assertTrue(choose_subnet.called)
@@ -838,22 +838,23 @@ class TestQuarkIpPoliciesIpAllocation(QuarkIpamBaseTest):
subnet_find.return_value = subnets subnet_find.return_value = subnets
yield yield
def test_default_ip_policy_on_subnet(self): def test_first_ip_is_not_network_ip_by_default(self):
subnet = dict(id=1, first_ip=0, last_ip=255, subnet = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4, cidr="192.168.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=dict(ip_policy=None), next_auto_assign_ip=3232235520,
ip_policy=dict(models.IPPolicy.DEFAULT_POLICY.policy)) network=dict(ip_policy=None),
ip_policy=None)
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]): with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0, address = self.ipam.allocate_ip_address(self.context, 0, 0, 0,
version=4) version=4)
self.assertEqual(address[0]["address"], 2) self.assertEqual(address[0]["address"], 3232235521)
def test_subnet_full_based_on_ip_policy(self): def test_subnet_full_based_on_ip_policy(self):
subnet = dict(id=1, first_ip=0, last_ip=255, subnet = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4, cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=dict(ip_policy=None), next_auto_assign_ip=0, network=dict(ip_policy=None),
ip_policy=dict(exclude= ip_policy=dict(exclude=[
[dict(offset=0, length=256)])) models.IPPolicyCIDR(cidr="0.0.0.0/24")]))
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]): with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
with self.assertRaises(exceptions.IpAddressGenerationFailure): with self.assertRaises(exceptions.IpAddressGenerationFailure):
self.ipam.allocate_ip_address(self.context, 0, 0, 0, version=4) self.ipam.allocate_ip_address(self.context, 0, 0, 0, version=4)
@@ -862,27 +863,16 @@ class TestQuarkIpPoliciesIpAllocation(QuarkIpamBaseTest):
subnet = dict(id=1, first_ip=0, last_ip=255, subnet = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4, cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=dict(ip_policy=None), next_auto_assign_ip=0, network=dict(ip_policy=None),
ip_policy=dict(exclude= ip_policy=dict(exclude=[
[dict(offset=0, length=2)])) models.IPPolicyCIDR(cidr="0.0.0.0/31")]))
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0,
version=4)
self.assertEqual(address[0]["address"], 2)
def test_ip_policy_on_subnet_negative_offset(self):
subnet = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=dict(ip_policy=None),
ip_policy=dict(exclude=
[dict(offset=-1, length=3)]))
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]): with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0, address = self.ipam.allocate_ip_address(self.context, 0, 0, 0,
version=4) version=4)
self.assertEqual(address[0]["address"], 2) self.assertEqual(address[0]["address"], 2)
def test_ip_policy_on_network(self): def test_ip_policy_on_network(self):
net = dict(ip_policy=dict(exclude= net = dict(ip_policy=dict(exclude=[
[dict(offset=0, length=2)])) models.IPPolicyCIDR(cidr="0.0.0.0/31")]))
subnet = dict(id=1, first_ip=0, last_ip=255, subnet = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4, cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=net, next_auto_assign_ip=0, network=net,
@@ -893,9 +883,9 @@ class TestQuarkIpPoliciesIpAllocation(QuarkIpamBaseTest):
self.assertEqual(address[0]["address"], 2) self.assertEqual(address[0]["address"], 2)
def test_ip_policy_on_network_exclusion_intersection(self): def test_ip_policy_on_network_exclusion_intersection(self):
net = dict(ip_policy=dict(exclude= net = dict(ip_policy=dict(exclude=[
[dict(offset=0, length=2), models.IPPolicyCIDR(cidr="0.0.0.0/31"),
dict(offset=254, length=1)])) models.IPPolicyCIDR(cidr="0.0.0.254/32")]))
subnet = dict(id=1, first_ip=0, last_ip=255, subnet = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4, cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=net, next_auto_assign_ip=0, network=net,
@@ -906,25 +896,24 @@ class TestQuarkIpPoliciesIpAllocation(QuarkIpamBaseTest):
self.assertEqual(address[0]["address"], 2) self.assertEqual(address[0]["address"], 2)
def test_ip_policy_on_both_subnet_preferred(self): def test_ip_policy_on_both_subnet_preferred(self):
net = dict(ip_policy=dict(exclude= net = dict(ip_policy=dict(exclude=[
[dict(offset=0, length=1), models.IPPolicyCIDR(cidr="0.0.0.0/31")]))
dict(offset=1, length=1)]))
subnet = dict(id=1, first_ip=0, last_ip=255, subnet = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4, cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=net, next_auto_assign_ip=0, network=net,
ip_policy=dict(exclude= ip_policy=dict(exclude=[
[dict(offset=254, length=1), models.IPPolicyCIDR(cidr="0.0.0.254/31")]))
dict(offset=255, length=1)]))
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]): with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0, address = self.ipam.allocate_ip_address(self.context, 0, 0, 0,
version=4) version=4)
self.assertEqual(address[0]["address"], 2) # 0 => 2 self.assertEqual(address[0]["address"], 1)
def test_ip_policy_allows_specified_ip(self): def test_ip_policy_allows_specified_ip(self):
subnet1 = dict(id=1, first_ip=0, last_ip=255, subnet1 = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4, cidr="0.0.0.0/24", ip_version=4,
network=dict(ip_policy=None), network=dict(ip_policy=None),
ip_policy=dict(exclude=[dict(offset=240, length=1)])) ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="0.0.0.240/32")]))
subnets = [(subnet1, 1)] subnets = [(subnet1, 1)]
with self._stubs(subnets=subnets, addresses=[None, None]): with self._stubs(subnets=subnets, addresses=[None, None]):
address = self.ipam.allocate_ip_address( address = self.ipam.allocate_ip_address(