Pulls Mac Address Range plugin support out
Refactors the quark plugin by moving Mac Address Ranges and associated tests into separate submodules. Also includes a hack for a pretty heinous bug. Check out https://github.com/jkoelker/quark/issues/130 for details.
This commit is contained in:
@@ -125,11 +125,8 @@ class QuarkIpam(object):
|
||||
if ip_address:
|
||||
next_ip = ip_address
|
||||
address = db_api.ip_address_find(
|
||||
elevated,
|
||||
network_id=net_id,
|
||||
ip_address=next_ip,
|
||||
tenant_id=elevated.tenant_id,
|
||||
scope=db_api.ONE)
|
||||
elevated, network_id=net_id, ip_address=next_ip,
|
||||
tenant_id=elevated.tenant_id, scope=db_api.ONE)
|
||||
if address:
|
||||
raise exceptions.IpAddressGenerationFailure(net_id=net_id)
|
||||
else:
|
||||
@@ -143,11 +140,8 @@ class QuarkIpam(object):
|
||||
if ip_policy_rules and next_ip in ip_policy_rules:
|
||||
continue
|
||||
address = db_api.ip_address_find(
|
||||
elevated,
|
||||
network_id=net_id,
|
||||
ip_address=next_ip,
|
||||
tenant_id=elevated.tenant_id,
|
||||
scope=db_api.ONE)
|
||||
elevated, network_id=net_id, ip_address=next_ip,
|
||||
tenant_id=elevated.tenant_id, scope=db_api.ONE)
|
||||
|
||||
address = db_api.ip_address_create(
|
||||
elevated, address=next_ip, subnet_id=subnet["id"],
|
||||
|
||||
110
quark/plugin.py
110
quark/plugin.py
@@ -44,6 +44,7 @@ from quark.db import api as db_api
|
||||
from quark.db import models
|
||||
from quark import exceptions as quark_exceptions
|
||||
from quark import network_strategy
|
||||
from quark.plugin_modules import mac_address_ranges
|
||||
from quark import plugin_views as v
|
||||
|
||||
LOG = logging.getLogger("neutron.quark")
|
||||
@@ -75,11 +76,19 @@ quark_quota_opts = [
|
||||
default=20,
|
||||
help=_('Maximum security group rules in a group')),
|
||||
]
|
||||
quark_resources = [
|
||||
quota.BaseResource('ports_per_network',
|
||||
'quota_ports_per_network'),
|
||||
quota.BaseResource('security_rules_per_group',
|
||||
'quota_security_rules_per_group'),
|
||||
]
|
||||
|
||||
STRATEGY = network_strategy.STRATEGY
|
||||
CONF.register_opts(quark_opts, "QUARK")
|
||||
CONF.register_opts(quark_quota_opts, "QUOTAS")
|
||||
|
||||
quota.QUOTAS.register_resources(quark_resources)
|
||||
|
||||
|
||||
def _pop_param(attrs, param, default=None):
|
||||
val = attrs.pop(param, default)
|
||||
@@ -137,14 +146,6 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
self.ipam_reuse_after = CONF.QUARK.ipam_reuse_after
|
||||
neutron_db_api.register_models(base=models.BASEV2)
|
||||
|
||||
quark_resources = [
|
||||
quota.BaseResource('ports_per_network',
|
||||
'quota_ports_per_network'),
|
||||
quota.BaseResource('security_rules_per_group',
|
||||
'quota_security_rules_per_group'),
|
||||
]
|
||||
quota.QUOTAS.register_resources(quark_resources)
|
||||
|
||||
def _make_security_group_list(self, context, group_ids):
|
||||
if not group_ids or group_ids is attributes.ATTR_NOT_SPECIFIED:
|
||||
return ([], [])
|
||||
@@ -888,87 +889,6 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
the_address["deallocated"] = 1
|
||||
return v._make_port_dict(port)
|
||||
|
||||
def get_mac_address_range(self, context, id, fields=None):
|
||||
"""Retrieve a mac_address_range.
|
||||
|
||||
: param context: neutron api request context
|
||||
: param id: UUID representing the network to fetch.
|
||||
: param fields: a list of strings that are valid keys in a
|
||||
network dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
|
||||
object in neutron/api/v2/attributes.py. Only these fields
|
||||
will be returned.
|
||||
"""
|
||||
LOG.info("get_mac_address_range %s for tenant %s fields %s" %
|
||||
(id, context.tenant_id, fields))
|
||||
|
||||
mac_address_range = db_api.mac_address_range_find(
|
||||
context, id=id, scope=db_api.ONE)
|
||||
|
||||
if not mac_address_range:
|
||||
raise quark_exceptions.MacAddressRangeNotFound(
|
||||
mac_address_range_id=id)
|
||||
return v._make_mac_range_dict(mac_address_range)
|
||||
|
||||
def get_mac_address_ranges(self, context):
|
||||
LOG.info("get_mac_address_ranges for tenant %s" % context.tenant_id)
|
||||
ranges = db_api.mac_address_range_find(context)
|
||||
return [v._make_mac_range_dict(m) for m in ranges]
|
||||
|
||||
def create_mac_address_range(self, context, mac_range):
|
||||
LOG.info("create_mac_address_range for tenant %s" % context.tenant_id)
|
||||
cidr = mac_range["mac_address_range"]["cidr"]
|
||||
cidr, first_address, last_address = self._to_mac_range(cidr)
|
||||
new_range = db_api.mac_address_range_create(
|
||||
context, cidr=cidr, first_address=first_address,
|
||||
last_address=last_address, next_auto_assign_mac=first_address)
|
||||
return v._make_mac_range_dict(new_range)
|
||||
|
||||
def _to_mac_range(self, val):
|
||||
cidr_parts = val.split("/")
|
||||
prefix = cidr_parts[0]
|
||||
|
||||
#FIXME(anyone): replace is slow, but this doesn't really
|
||||
# get called ever. Fix maybe?
|
||||
prefix = prefix.replace(':', '')
|
||||
prefix = prefix.replace('-', '')
|
||||
prefix_length = len(prefix)
|
||||
if prefix_length < 6 or prefix_length > 10:
|
||||
raise quark_exceptions.InvalidMacAddressRange(cidr=val)
|
||||
|
||||
diff = 12 - len(prefix)
|
||||
if len(cidr_parts) > 1:
|
||||
mask = int(cidr_parts[1])
|
||||
else:
|
||||
mask = 48 - diff * 4
|
||||
mask_size = 1 << (48 - mask)
|
||||
prefix = "%s%s" % (prefix, "0" * diff)
|
||||
try:
|
||||
cidr = "%s/%s" % (str(netaddr.EUI(prefix)).replace("-", ":"), mask)
|
||||
except netaddr.AddrFormatError:
|
||||
raise quark_exceptions.InvalidMacAddressRange(cidr=val)
|
||||
prefix_int = int(prefix, base=16)
|
||||
return cidr, prefix_int, prefix_int + mask_size
|
||||
|
||||
def _delete_mac_address_range(self, context, mac_address_range):
|
||||
if mac_address_range.allocated_macs:
|
||||
raise quark_exceptions.MacAddressRangeInUse(
|
||||
mac_address_range_id=mac_address_range["id"])
|
||||
db_api.mac_address_range_delete(context, mac_address_range)
|
||||
|
||||
def delete_mac_address_range(self, context, id):
|
||||
"""Delete a mac_address_range.
|
||||
|
||||
: param context: neutron api request context
|
||||
: param id: UUID representing the mac_address_range to delete.
|
||||
"""
|
||||
LOG.info("delete_mac_address_range %s for tenant %s" %
|
||||
(id, context.tenant_id))
|
||||
mar = db_api.mac_address_range_find(context, id=id, scope=db_api.ONE)
|
||||
if not mar:
|
||||
raise quark_exceptions.MacAddressRangeNotFound(
|
||||
mac_address_range_id=id)
|
||||
self._delete_mac_address_range(context, mar)
|
||||
|
||||
def get_route(self, context, id):
|
||||
LOG.info("get_route %s for tenant %s" % (id, context.tenant_id))
|
||||
route = db_api.route_find(context, id=id, scope=db_api.ONE)
|
||||
@@ -1309,3 +1229,15 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
if not ipp:
|
||||
raise quark_exceptions.IPPolicyNotFound(id=id)
|
||||
db_api.ip_policy_delete(context, ipp)
|
||||
|
||||
def get_mac_address_range(self, context, id, fields=None):
|
||||
return mac_address_ranges.get_mac_address_range(context, id, fields)
|
||||
|
||||
def get_mac_address_ranges(self, context):
|
||||
return mac_address_ranges.get_mac_address_ranges(context)
|
||||
|
||||
def create_mac_address_range(self, context, mac_range):
|
||||
return mac_address_ranges.create_mac_address_range(context, mac_range)
|
||||
|
||||
def delete_mac_address_range(self, context, id):
|
||||
mac_address_ranges.delete_mac_address_range(context, id)
|
||||
|
||||
0
quark/plugin_modules/__init__.py
Normal file
0
quark/plugin_modules/__init__.py
Normal file
110
quark/plugin_modules/mac_address_ranges.py
Normal file
110
quark/plugin_modules/mac_address_ranges.py
Normal file
@@ -0,0 +1,110 @@
|
||||
# Copyright 2013 Openstack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import netaddr
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
from quark.db import api as db_api
|
||||
from quark import exceptions as quark_exceptions
|
||||
from quark import plugin_views as v
|
||||
|
||||
LOG = logging.getLogger("neutron.quark")
|
||||
|
||||
|
||||
def _to_mac_range(val):
|
||||
cidr_parts = val.split("/")
|
||||
prefix = cidr_parts[0]
|
||||
|
||||
#FIXME(anyone): replace is slow, but this doesn't really
|
||||
# get called ever. Fix maybe?
|
||||
prefix = prefix.replace(':', '')
|
||||
prefix = prefix.replace('-', '')
|
||||
prefix_length = len(prefix)
|
||||
if prefix_length < 6 or prefix_length > 10:
|
||||
raise quark_exceptions.InvalidMacAddressRange(cidr=val)
|
||||
|
||||
diff = 12 - len(prefix)
|
||||
if len(cidr_parts) > 1:
|
||||
mask = int(cidr_parts[1])
|
||||
else:
|
||||
mask = 48 - diff * 4
|
||||
mask_size = 1 << (48 - mask)
|
||||
prefix = "%s%s" % (prefix, "0" * diff)
|
||||
try:
|
||||
cidr = "%s/%s" % (str(netaddr.EUI(prefix)).replace("-", ":"), mask)
|
||||
except netaddr.AddrFormatError:
|
||||
raise quark_exceptions.InvalidMacAddressRange(cidr=val)
|
||||
prefix_int = int(prefix, base=16)
|
||||
return cidr, prefix_int, prefix_int + mask_size
|
||||
|
||||
|
||||
def get_mac_address_range(context, id, fields=None):
|
||||
"""Retrieve a mac_address_range.
|
||||
|
||||
: param context: neutron api request context
|
||||
: param id: UUID representing the network to fetch.
|
||||
: param fields: a list of strings that are valid keys in a
|
||||
network dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
|
||||
object in neutron/api/v2/attributes.py. Only these fields
|
||||
will be returned.
|
||||
"""
|
||||
LOG.info("get_mac_address_range %s for tenant %s fields %s" %
|
||||
(id, context.tenant_id, fields))
|
||||
|
||||
mac_address_range = db_api.mac_address_range_find(
|
||||
context, id=id, scope=db_api.ONE)
|
||||
|
||||
if not mac_address_range:
|
||||
raise quark_exceptions.MacAddressRangeNotFound(
|
||||
mac_address_range_id=id)
|
||||
return v._make_mac_range_dict(mac_address_range)
|
||||
|
||||
|
||||
def get_mac_address_ranges(context):
|
||||
LOG.info("get_mac_address_ranges for tenant %s" % context.tenant_id)
|
||||
ranges = db_api.mac_address_range_find(context)
|
||||
return [v._make_mac_range_dict(m) for m in ranges]
|
||||
|
||||
|
||||
def create_mac_address_range(context, mac_range):
|
||||
LOG.info("create_mac_address_range for tenant %s" % context.tenant_id)
|
||||
cidr = mac_range["mac_address_range"]["cidr"]
|
||||
cidr, first_address, last_address = _to_mac_range(cidr)
|
||||
new_range = db_api.mac_address_range_create(
|
||||
context, cidr=cidr, first_address=first_address,
|
||||
last_address=last_address, next_auto_assign_mac=first_address)
|
||||
return v._make_mac_range_dict(new_range)
|
||||
|
||||
|
||||
def _delete_mac_address_range(context, mac_address_range):
|
||||
if mac_address_range.allocated_macs:
|
||||
raise quark_exceptions.MacAddressRangeInUse(
|
||||
mac_address_range_id=mac_address_range["id"])
|
||||
db_api.mac_address_range_delete(context, mac_address_range)
|
||||
|
||||
|
||||
def delete_mac_address_range(context, id):
|
||||
"""Delete a mac_address_range.
|
||||
|
||||
: param context: neutron api request context
|
||||
: param id: UUID representing the mac_address_range to delete.
|
||||
"""
|
||||
LOG.info("delete_mac_address_range %s for tenant %s" %
|
||||
(id, context.tenant_id))
|
||||
mar = db_api.mac_address_range_find(context, id=id, scope=db_api.ONE)
|
||||
if not mar:
|
||||
raise quark_exceptions.MacAddressRangeNotFound(
|
||||
mac_address_range_id=id)
|
||||
_delete_mac_address_range(context, mar)
|
||||
0
quark/tests/plugin_modules/__init__.py
Normal file
0
quark/tests/plugin_modules/__init__.py
Normal file
146
quark/tests/plugin_modules/test_mac_address_ranges.py
Normal file
146
quark/tests/plugin_modules/test_mac_address_ranges.py
Normal file
@@ -0,0 +1,146 @@
|
||||
# Copyright 2013 Openstack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import mock
|
||||
import netaddr
|
||||
|
||||
from quark import exceptions as quark_exceptions
|
||||
from quark.plugin_modules import mac_address_ranges
|
||||
from quark.tests import test_quark_plugin
|
||||
|
||||
|
||||
class TestQuarkGetMacAddressRanges(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, mac_range):
|
||||
db_mod = "quark.db.api"
|
||||
with mock.patch("%s.mac_address_range_find" % db_mod) as mar_find:
|
||||
mar_find.return_value = mac_range
|
||||
yield
|
||||
|
||||
def test_find_mac_ranges(self):
|
||||
mar = dict(id=1, cidr="AA:BB:CC/24")
|
||||
with self._stubs([mar]):
|
||||
res = self.plugin.get_mac_address_ranges(self.context)
|
||||
self.assertEqual(res[0]["id"], mar["id"])
|
||||
self.assertEqual(res[0]["cidr"], mar["cidr"])
|
||||
|
||||
def test_find_mac_range(self):
|
||||
mar = dict(id=1, cidr="AA:BB:CC/24")
|
||||
with self._stubs(mar):
|
||||
res = self.plugin.get_mac_address_range(self.context, 1)
|
||||
self.assertEqual(res["id"], mar["id"])
|
||||
self.assertEqual(res["cidr"], mar["cidr"])
|
||||
|
||||
def test_find_mac_range_fail(self):
|
||||
with self._stubs(None):
|
||||
with self.assertRaises(quark_exceptions.MacAddressRangeNotFound):
|
||||
self.plugin.get_mac_address_range(self.context, 1)
|
||||
|
||||
|
||||
class TestQuarkCreateMacAddressRanges(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, mac_range):
|
||||
db_mod = "quark.db.api"
|
||||
with mock.patch("%s.mac_address_range_create" % db_mod) as mar_create:
|
||||
mar_create.return_value = mac_range
|
||||
yield
|
||||
|
||||
def test_create_range(self):
|
||||
mar = dict(mac_address_range=dict(id=1, cidr="AA:BB:CC/24"))
|
||||
with self._stubs(mar["mac_address_range"]):
|
||||
res = self.plugin.create_mac_address_range(self.context, mar)
|
||||
self.assertEqual(res["id"], mar["mac_address_range"]["id"])
|
||||
self.assertEqual(res["cidr"], mar["mac_address_range"]["cidr"])
|
||||
|
||||
def test_to_mac_range_cidr_format(self):
|
||||
cidr, first, last = mac_address_ranges._to_mac_range("AA:BB:CC/24")
|
||||
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
|
||||
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
|
||||
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
|
||||
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
|
||||
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
|
||||
|
||||
def test_to_mac_range_just_prefix(self):
|
||||
cidr, first, last = mac_address_ranges._to_mac_range("AA:BB:CC")
|
||||
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
|
||||
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
|
||||
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
|
||||
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
|
||||
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
|
||||
|
||||
def test_to_mac_range_unix_format(self):
|
||||
cidr, first, last = mac_address_ranges._to_mac_range("AA-BB-CC")
|
||||
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
|
||||
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
|
||||
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
|
||||
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
|
||||
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
|
||||
|
||||
def test_to_mac_range_unix_cidr_format(self):
|
||||
cidr, first, last = mac_address_ranges._to_mac_range("AA-BB-CC/24")
|
||||
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
|
||||
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
|
||||
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
|
||||
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
|
||||
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
|
||||
|
||||
def test_to_mac_prefix_too_short_fails(self):
|
||||
with self.assertRaises(quark_exceptions.InvalidMacAddressRange):
|
||||
cidr, first, last = mac_address_ranges._to_mac_range("AA-BB")
|
||||
|
||||
def test_to_mac_prefix_too_long_fails(self):
|
||||
with self.assertRaises(quark_exceptions.InvalidMacAddressRange):
|
||||
cidr, first, last = mac_address_ranges._to_mac_range(
|
||||
"AA-BB-CC-DD-EE-F0")
|
||||
|
||||
def test_to_mac_prefix_is_garbage_fails(self):
|
||||
with self.assertRaises(quark_exceptions.InvalidMacAddressRange):
|
||||
cidr, first, last = mac_address_ranges._to_mac_range("F0-0-BAR")
|
||||
|
||||
|
||||
class TestQuarkDeleteMacAddressRanges(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, mac_range):
|
||||
db_mod = "quark.db.api"
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.mac_address_range_find" % db_mod),
|
||||
mock.patch("%s.mac_address_range_delete" % db_mod),
|
||||
) as (mar_find, mar_delete):
|
||||
mar_find.return_value = mac_range
|
||||
yield mar_delete
|
||||
|
||||
def test_mac_address_range_delete_not_found(self):
|
||||
with self._stubs(None):
|
||||
with self.assertRaises(quark_exceptions.MacAddressRangeNotFound):
|
||||
self.plugin.delete_mac_address_range(self.context, 1)
|
||||
|
||||
def test_mac_address_range_delete_in_use(self):
|
||||
mar = mock.MagicMock()
|
||||
mar.id = 1
|
||||
mar.allocated_macs = 1
|
||||
with self._stubs(mar):
|
||||
with self.assertRaises(quark_exceptions.MacAddressRangeInUse):
|
||||
self.plugin.delete_mac_address_range(self.context, 1)
|
||||
|
||||
def test_mac_address_range_delete_success(self):
|
||||
mar = mock.MagicMock()
|
||||
mar.id = 1
|
||||
mar.allocated_macs = 0
|
||||
with self._stubs(mar) as mar_delete:
|
||||
resp = self.plugin.delete_mac_address_range(self.context, 1)
|
||||
self.assertIsNone(resp)
|
||||
mar_delete.assert_called_once_with(self.context, mar)
|
||||
@@ -22,6 +22,7 @@ from oslo.config import cfg
|
||||
|
||||
from quark.db import models
|
||||
import quark.ipam
|
||||
import quark.plugin
|
||||
|
||||
from quark.tests import test_base
|
||||
|
||||
@@ -35,6 +36,10 @@ class QuarkIpamBaseTest(test_base.TestBase):
|
||||
models.BASEV2.metadata.create_all(neutron_session._ENGINE)
|
||||
self.ipam = quark.ipam.QuarkIpam()
|
||||
|
||||
# FIXME(mdietz): refactor around issue #130 and remove this
|
||||
# Ensures that the perhaps_generate_uuid event handler is initialized
|
||||
self.plugin = quark.plugin.Plugin()
|
||||
|
||||
def tearDown(self):
|
||||
neutron_db_api.clear_db()
|
||||
|
||||
@@ -348,7 +353,7 @@ class QuarkIPAddressAllocateDeallocated(QuarkIpamBaseTest):
|
||||
False, subnet, address, addresses_found
|
||||
) as (choose_subnet):
|
||||
ipaddress = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
|
||||
self.assertIsNone(ipaddress['id'])
|
||||
self.assertIsNotNone(ipaddress['id'])
|
||||
self.assertTrue(choose_subnet.called)
|
||||
|
||||
def test_allocate_finds_gap_in_address_space(self):
|
||||
@@ -368,7 +373,7 @@ class QuarkIPAddressAllocateDeallocated(QuarkIpamBaseTest):
|
||||
) as (choose_subnet):
|
||||
ipaddress = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
|
||||
self.assertEqual(ipaddress["address"], 2)
|
||||
self.assertIsNone(ipaddress['id'])
|
||||
self.assertIsNotNone(ipaddress['id'])
|
||||
self.assertTrue(choose_subnet.called)
|
||||
|
||||
|
||||
|
||||
@@ -1568,128 +1568,6 @@ class TestQuarkDisassociatePort(TestQuarkPlugin):
|
||||
self.plugin.disassociate_port(self.context, 1, 1)
|
||||
|
||||
|
||||
class TestQuarkGetMacAddressRanges(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, mac_range):
|
||||
db_mod = "quark.db.api"
|
||||
with mock.patch("%s.mac_address_range_find" % db_mod) as mar_find:
|
||||
mar_find.return_value = mac_range
|
||||
yield
|
||||
|
||||
def test_find_mac_ranges(self):
|
||||
mar = dict(id=1, cidr="AA:BB:CC/24")
|
||||
with self._stubs([mar]):
|
||||
res = self.plugin.get_mac_address_ranges(self.context)
|
||||
self.assertEqual(res[0]["id"], mar["id"])
|
||||
self.assertEqual(res[0]["cidr"], mar["cidr"])
|
||||
|
||||
def test_find_mac_range(self):
|
||||
mar = dict(id=1, cidr="AA:BB:CC/24")
|
||||
with self._stubs(mar):
|
||||
res = self.plugin.get_mac_address_range(self.context, 1)
|
||||
self.assertEqual(res["id"], mar["id"])
|
||||
self.assertEqual(res["cidr"], mar["cidr"])
|
||||
|
||||
def test_find_mac_range_fail(self):
|
||||
with self._stubs(None):
|
||||
with self.assertRaises(quark_exceptions.MacAddressRangeNotFound):
|
||||
self.plugin.get_mac_address_range(self.context, 1)
|
||||
|
||||
|
||||
class TestQuarkCreateMacAddressRanges(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, mac_range):
|
||||
db_mod = "quark.db.api"
|
||||
with mock.patch("%s.mac_address_range_create" % db_mod) as mar_create:
|
||||
mar_create.return_value = mac_range
|
||||
yield
|
||||
|
||||
def test_create_range(self):
|
||||
mar = dict(mac_address_range=dict(id=1, cidr="AA:BB:CC/24"))
|
||||
with self._stubs(mar["mac_address_range"]):
|
||||
res = self.plugin.create_mac_address_range(self.context, mar)
|
||||
self.assertEqual(res["id"], mar["mac_address_range"]["id"])
|
||||
self.assertEqual(res["cidr"], mar["mac_address_range"]["cidr"])
|
||||
|
||||
def test_to_mac_range_cidr_format(self):
|
||||
cidr, first, last = self.plugin._to_mac_range("AA:BB:CC/24")
|
||||
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
|
||||
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
|
||||
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
|
||||
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
|
||||
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
|
||||
|
||||
def test_to_mac_range_just_prefix(self):
|
||||
cidr, first, last = self.plugin._to_mac_range("AA:BB:CC")
|
||||
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
|
||||
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
|
||||
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
|
||||
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
|
||||
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
|
||||
|
||||
def test_to_mac_range_unix_format(self):
|
||||
cidr, first, last = self.plugin._to_mac_range("AA-BB-CC")
|
||||
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
|
||||
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
|
||||
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
|
||||
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
|
||||
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
|
||||
|
||||
def test_to_mac_range_unix_cidr_format(self):
|
||||
cidr, first, last = self.plugin._to_mac_range("AA-BB-CC/24")
|
||||
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
|
||||
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
|
||||
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
|
||||
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
|
||||
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
|
||||
|
||||
def test_to_mac_prefix_too_short_fails(self):
|
||||
with self.assertRaises(quark_exceptions.InvalidMacAddressRange):
|
||||
cidr, first, last = self.plugin._to_mac_range("AA-BB")
|
||||
|
||||
def test_to_mac_prefix_too_long_fails(self):
|
||||
with self.assertRaises(quark_exceptions.InvalidMacAddressRange):
|
||||
cidr, first, last = self.plugin._to_mac_range("AA-BB-CC-DD-EE-F0")
|
||||
|
||||
def test_to_mac_prefix_is_garbage_fails(self):
|
||||
with self.assertRaises(quark_exceptions.InvalidMacAddressRange):
|
||||
cidr, first, last = self.plugin._to_mac_range("F0-0-BAR")
|
||||
|
||||
|
||||
class TestQuarkDeleteMacAddressRanges(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, mac_range):
|
||||
db_mod = "quark.db.api"
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.mac_address_range_find" % db_mod),
|
||||
mock.patch("%s.mac_address_range_delete" % db_mod),
|
||||
) as (mar_find, mar_delete):
|
||||
mar_find.return_value = mac_range
|
||||
yield mar_delete
|
||||
|
||||
def test_mac_address_range_delete_not_found(self):
|
||||
with self._stubs(None):
|
||||
with self.assertRaises(quark_exceptions.MacAddressRangeNotFound):
|
||||
self.plugin.delete_mac_address_range(self.context, 1)
|
||||
|
||||
def test_mac_address_range_delete_in_use(self):
|
||||
mar = mock.MagicMock()
|
||||
mar.id = 1
|
||||
mar.allocated_macs = 1
|
||||
with self._stubs(mar):
|
||||
with self.assertRaises(quark_exceptions.MacAddressRangeInUse):
|
||||
self.plugin.delete_mac_address_range(self.context, 1)
|
||||
|
||||
def test_mac_address_range_delete_success(self):
|
||||
mar = mock.MagicMock()
|
||||
mar.id = 1
|
||||
mar.allocated_macs = 0
|
||||
with self._stubs(mar) as mar_delete:
|
||||
resp = self.plugin.delete_mac_address_range(self.context, 1)
|
||||
self.assertIsNone(resp)
|
||||
mar_delete.assert_called_once_with(self.context, mar)
|
||||
|
||||
|
||||
class TestQuarkGetRoutes(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, routes):
|
||||
@@ -1891,7 +1769,6 @@ class TestQuarkCreateSecurityGroup(TestQuarkPlugin):
|
||||
result = self.plugin.create_security_group(
|
||||
self.context, {'security_group': group})
|
||||
self.assertTrue(group_create.called)
|
||||
print "expected: %s but got: %s" % (expected, result)
|
||||
for key in expected.keys():
|
||||
self.assertEqual(result[key], expected[key])
|
||||
|
||||
@@ -2004,7 +1881,6 @@ class TestQuarkCreateSecurityGroupRule(TestQuarkPlugin):
|
||||
result = self.plugin.create_security_group_rule(
|
||||
self.context, {'security_group_rule': rule})
|
||||
self.assertTrue(rule_create.called)
|
||||
print "expected: %s but got: %s" % (expected, result)
|
||||
for key in expected.keys():
|
||||
self.assertEqual(expected[key], result[key])
|
||||
|
||||
@@ -2072,7 +1948,6 @@ class TestQuarkDeleteSecurityGroupRule(TestQuarkPlugin):
|
||||
if rule:
|
||||
dbrule = models.SecurityGroupRule()
|
||||
dbrule.update(dict(rule, group=dbgroup))
|
||||
print rule
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("quark.db.api.security_group_find"),
|
||||
|
||||
Reference in New Issue
Block a user