Moves IP Policies into their own plugin module
Refactors plugin.py, moving IP policies and associated tests into their own modules for enchanced plugin.py readability.
This commit is contained in:
@@ -41,6 +41,7 @@ from quark.db import api as db_api
|
|||||||
from quark.db import models
|
from quark.db import models
|
||||||
from quark import exceptions as quark_exceptions
|
from quark import exceptions as quark_exceptions
|
||||||
from quark import network_strategy
|
from quark import network_strategy
|
||||||
|
from quark.plugin_modules import ip_policies
|
||||||
from quark.plugin_modules import mac_address_ranges
|
from quark.plugin_modules import mac_address_ranges
|
||||||
from quark.plugin_modules import security_groups
|
from quark.plugin_modules import security_groups
|
||||||
from quark import plugin_views as v
|
from quark import plugin_views as v
|
||||||
@@ -942,59 +943,6 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
|||||||
|
|
||||||
return v._make_ip_dict(address)
|
return v._make_ip_dict(address)
|
||||||
|
|
||||||
def create_ip_policy(self, context, ip_policy):
|
|
||||||
LOG.info("create_ip_policy for tenant %s" % context.tenant_id)
|
|
||||||
|
|
||||||
ipp = ip_policy["ip_policy"]
|
|
||||||
|
|
||||||
if not ipp.get("exclude"):
|
|
||||||
raise exceptions.BadRequest(resource="ip_policy",
|
|
||||||
msg="Empty ip_policy.exclude regions")
|
|
||||||
|
|
||||||
ipp["exclude"] = netaddr.IPSet(ipp["exclude"])
|
|
||||||
network_id = ipp.get("network_id")
|
|
||||||
subnet_id = ipp.get("subnet_id")
|
|
||||||
|
|
||||||
model = None
|
|
||||||
if subnet_id:
|
|
||||||
model = db_api.subnet_find(context, id=subnet_id, scope=db_api.ONE)
|
|
||||||
if not model:
|
|
||||||
raise exceptions.SubnetNotFound(id=subnet_id)
|
|
||||||
elif network_id:
|
|
||||||
model = db_api.network_find(context, id=network_id,
|
|
||||||
scope=db_api.ONE)
|
|
||||||
if not model:
|
|
||||||
raise exceptions.NetworkNotFound(id=network_id)
|
|
||||||
else:
|
|
||||||
raise exceptions.BadRequest(
|
|
||||||
resource="ip_policy",
|
|
||||||
msg="network_id or subnet_id unspecified")
|
|
||||||
|
|
||||||
if model["ip_policy"]:
|
|
||||||
raise quark_exceptions.IPPolicyAlreadyExists(
|
|
||||||
id=model["ip_policy"]["id"], n_id=model["id"])
|
|
||||||
model["ip_policy"] = db_api.ip_policy_create(context, **ipp)
|
|
||||||
return v._make_ip_policy_dict(model["ip_policy"])
|
|
||||||
|
|
||||||
def get_ip_policy(self, context, id):
|
|
||||||
LOG.info("get_ip_policy %s for tenant %s" % (id, context.tenant_id))
|
|
||||||
ipp = db_api.ip_policy_find(context, id=id, scope=db_api.ONE)
|
|
||||||
if not ipp:
|
|
||||||
raise quark_exceptions.IPPolicyNotFound(id=id)
|
|
||||||
return v._make_ip_policy_dict(ipp)
|
|
||||||
|
|
||||||
def get_ip_policies(self, context, **filters):
|
|
||||||
LOG.info("get_ip_policies for tenant %s" % (context.tenant_id))
|
|
||||||
ipps = db_api.ip_policy_find(context, scope=db_api.ALL, **filters)
|
|
||||||
return [v._make_ip_policy_dict(ipp) for ipp in ipps]
|
|
||||||
|
|
||||||
def delete_ip_policy(self, context, id):
|
|
||||||
LOG.info("delete_ip_policy %s for tenant %s" % (id, context.tenant_id))
|
|
||||||
ipp = db_api.ip_policy_find(context, id=id, scope=db_api.ONE)
|
|
||||||
if not ipp:
|
|
||||||
raise quark_exceptions.IPPolicyNotFound(id=id)
|
|
||||||
db_api.ip_policy_delete(context, ipp)
|
|
||||||
|
|
||||||
def get_mac_address_range(self, context, id, fields=None):
|
def get_mac_address_range(self, context, id, fields=None):
|
||||||
return mac_address_ranges.get_mac_address_range(context, id, fields)
|
return mac_address_ranges.get_mac_address_range(context, id, fields)
|
||||||
|
|
||||||
@@ -1043,3 +991,15 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
|||||||
def update_security_group(self, context, id, security_group):
|
def update_security_group(self, context, id, security_group):
|
||||||
return security_groups.update_security_group(context, id,
|
return security_groups.update_security_group(context, id,
|
||||||
security_group)
|
security_group)
|
||||||
|
|
||||||
|
def create_ip_policy(self, context, ip_policy):
|
||||||
|
return ip_policies.create_ip_policy(context, ip_policy)
|
||||||
|
|
||||||
|
def get_ip_policy(self, context, id):
|
||||||
|
return ip_policies.get_ip_policy(context, id)
|
||||||
|
|
||||||
|
def get_ip_policies(self, context, **filters):
|
||||||
|
return ip_policies.get_ip_policies(context, **filters)
|
||||||
|
|
||||||
|
def delete_ip_policy(self, context, id):
|
||||||
|
return ip_policies.delete_ip_policy(context, id)
|
||||||
|
|||||||
86
quark/plugin_modules/ip_policies.py
Normal file
86
quark/plugin_modules/ip_policies.py
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
# Copyright 2013 Openstack Foundation
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import netaddr
|
||||||
|
|
||||||
|
from neutron.common import exceptions
|
||||||
|
from neutron.openstack.common import log as logging
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
from quark.db import api as db_api
|
||||||
|
from quark import exceptions as quark_exceptions
|
||||||
|
from quark import plugin_views as v
|
||||||
|
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
LOG = logging.getLogger("neutron.quark")
|
||||||
|
DEFAULT_SG_UUID = "00000000-0000-0000-0000-000000000000"
|
||||||
|
|
||||||
|
|
||||||
|
def create_ip_policy(context, ip_policy):
|
||||||
|
LOG.info("create_ip_policy for tenant %s" % context.tenant_id)
|
||||||
|
|
||||||
|
ipp = ip_policy["ip_policy"]
|
||||||
|
|
||||||
|
if not ipp.get("exclude"):
|
||||||
|
raise exceptions.BadRequest(resource="ip_policy",
|
||||||
|
msg="Empty ip_policy.exclude regions")
|
||||||
|
|
||||||
|
ipp["exclude"] = netaddr.IPSet(ipp["exclude"])
|
||||||
|
network_id = ipp.get("network_id")
|
||||||
|
subnet_id = ipp.get("subnet_id")
|
||||||
|
|
||||||
|
model = None
|
||||||
|
if subnet_id:
|
||||||
|
model = db_api.subnet_find(context, id=subnet_id, scope=db_api.ONE)
|
||||||
|
if not model:
|
||||||
|
raise exceptions.SubnetNotFound(id=subnet_id)
|
||||||
|
elif network_id:
|
||||||
|
model = db_api.network_find(context, id=network_id,
|
||||||
|
scope=db_api.ONE)
|
||||||
|
if not model:
|
||||||
|
raise exceptions.NetworkNotFound(id=network_id)
|
||||||
|
else:
|
||||||
|
raise exceptions.BadRequest(
|
||||||
|
resource="ip_policy",
|
||||||
|
msg="network_id or subnet_id unspecified")
|
||||||
|
|
||||||
|
if model["ip_policy"]:
|
||||||
|
raise quark_exceptions.IPPolicyAlreadyExists(
|
||||||
|
id=model["ip_policy"]["id"], n_id=model["id"])
|
||||||
|
model["ip_policy"] = db_api.ip_policy_create(context, **ipp)
|
||||||
|
return v._make_ip_policy_dict(model["ip_policy"])
|
||||||
|
|
||||||
|
|
||||||
|
def get_ip_policy(context, id):
|
||||||
|
LOG.info("get_ip_policy %s for tenant %s" % (id, context.tenant_id))
|
||||||
|
ipp = db_api.ip_policy_find(context, id=id, scope=db_api.ONE)
|
||||||
|
if not ipp:
|
||||||
|
raise quark_exceptions.IPPolicyNotFound(id=id)
|
||||||
|
return v._make_ip_policy_dict(ipp)
|
||||||
|
|
||||||
|
|
||||||
|
def get_ip_policies(context, **filters):
|
||||||
|
LOG.info("get_ip_policies for tenant %s" % (context.tenant_id))
|
||||||
|
ipps = db_api.ip_policy_find(context, scope=db_api.ALL, **filters)
|
||||||
|
return [v._make_ip_policy_dict(ipp) for ipp in ipps]
|
||||||
|
|
||||||
|
|
||||||
|
def delete_ip_policy(context, id):
|
||||||
|
LOG.info("delete_ip_policy %s for tenant %s" % (id, context.tenant_id))
|
||||||
|
ipp = db_api.ip_policy_find(context, id=id, scope=db_api.ONE)
|
||||||
|
if not ipp:
|
||||||
|
raise quark_exceptions.IPPolicyNotFound(id=id)
|
||||||
|
db_api.ip_policy_delete(context, ipp)
|
||||||
181
quark/tests/plugin_modules/test_ip_policies.py
Normal file
181
quark/tests/plugin_modules/test_ip_policies.py
Normal file
@@ -0,0 +1,181 @@
|
|||||||
|
# Copyright 2013 Openstack Foundation
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
|
||||||
|
import mock
|
||||||
|
import netaddr
|
||||||
|
from neutron.common import exceptions
|
||||||
|
|
||||||
|
from quark import exceptions as quark_exceptions
|
||||||
|
from quark.tests import test_quark_plugin
|
||||||
|
|
||||||
|
|
||||||
|
class TestQuarkGetIpPolicies(test_quark_plugin.TestQuarkPlugin):
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _stubs(self, ip_policy):
|
||||||
|
db_mod = "quark.db.api"
|
||||||
|
with mock.patch("%s.ip_policy_find" % db_mod) as ip_policy_find:
|
||||||
|
ip_policy_find.return_value = ip_policy
|
||||||
|
yield
|
||||||
|
|
||||||
|
def test_get_ip_policy_not_found(self):
|
||||||
|
with self._stubs(None):
|
||||||
|
with self.assertRaises(quark_exceptions.IPPolicyNotFound):
|
||||||
|
self.plugin.get_ip_policy(self.context, 1)
|
||||||
|
|
||||||
|
def test_get_ip_policy(self):
|
||||||
|
address = int(netaddr.IPAddress("1.1.1.1"))
|
||||||
|
ip_policy = dict(
|
||||||
|
id=1,
|
||||||
|
subnet_id=1,
|
||||||
|
network_id=2,
|
||||||
|
exclude=[dict(address=address, prefix=24)])
|
||||||
|
with self._stubs(ip_policy):
|
||||||
|
resp = self.plugin.get_ip_policy(self.context, 1)
|
||||||
|
self.assertEqual(len(resp.keys()), 4)
|
||||||
|
self.assertEqual(resp["id"], 1)
|
||||||
|
self.assertEqual(resp["subnet_id"], 1)
|
||||||
|
self.assertEqual(resp["network_id"], 2)
|
||||||
|
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
|
||||||
|
|
||||||
|
def test_get_ip_policies(self):
|
||||||
|
address = int(netaddr.IPAddress("1.1.1.1"))
|
||||||
|
ip_policy = dict(
|
||||||
|
id=1,
|
||||||
|
subnet_id=1,
|
||||||
|
network_id=2,
|
||||||
|
exclude=[dict(address=address, prefix=24)])
|
||||||
|
with self._stubs([ip_policy]):
|
||||||
|
resp = self.plugin.get_ip_policies(self.context)
|
||||||
|
self.assertEqual(len(resp), 1)
|
||||||
|
resp = resp[0]
|
||||||
|
self.assertEqual(len(resp.keys()), 4)
|
||||||
|
self.assertEqual(resp["id"], 1)
|
||||||
|
self.assertEqual(resp["subnet_id"], 1)
|
||||||
|
self.assertEqual(resp["network_id"], 2)
|
||||||
|
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
|
||||||
|
|
||||||
|
|
||||||
|
class TestQuarkCreateIpPolicies(test_quark_plugin.TestQuarkPlugin):
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _stubs(self, ip_policy, subnet=None, net=None):
|
||||||
|
db_mod = "quark.db.api"
|
||||||
|
with contextlib.nested(
|
||||||
|
mock.patch("%s.subnet_find" % db_mod),
|
||||||
|
mock.patch("%s.network_find" % db_mod),
|
||||||
|
mock.patch("%s.ip_policy_create" % db_mod),
|
||||||
|
) as (subnet_find, net_find, ip_policy_create):
|
||||||
|
subnet_find.return_value = subnet
|
||||||
|
net_find.return_value = net
|
||||||
|
ip_policy_create.return_value = ip_policy
|
||||||
|
yield ip_policy_create
|
||||||
|
|
||||||
|
def test_create_ip_policy_invalid_body_missing_exclude(self):
|
||||||
|
with self._stubs(None):
|
||||||
|
with self.assertRaises(exceptions.BadRequest):
|
||||||
|
self.plugin.create_ip_policy(self.context, dict(
|
||||||
|
ip_policy=dict()))
|
||||||
|
|
||||||
|
def test_create_ip_policy_invalid_body_missing_netsubnet(self):
|
||||||
|
with self._stubs(None):
|
||||||
|
with self.assertRaises(exceptions.BadRequest):
|
||||||
|
self.plugin.create_ip_policy(self.context, dict(
|
||||||
|
ip_policy=dict(exclude=["1.1.1.1/24"])))
|
||||||
|
|
||||||
|
def test_create_ip_policy_invalid_subnet(self):
|
||||||
|
with self._stubs(None):
|
||||||
|
with self.assertRaises(exceptions.SubnetNotFound):
|
||||||
|
self.plugin.create_ip_policy(self.context, dict(
|
||||||
|
ip_policy=dict(subnet_id=1,
|
||||||
|
exclude=["1.1.1.1/24"])))
|
||||||
|
|
||||||
|
def test_create_ip_policy_invalid_network(self):
|
||||||
|
with self._stubs(None):
|
||||||
|
with self.assertRaises(exceptions.NetworkNotFound):
|
||||||
|
self.plugin.create_ip_policy(self.context, dict(
|
||||||
|
ip_policy=dict(network_id=1,
|
||||||
|
exclude=["1.1.1.1/24"])))
|
||||||
|
|
||||||
|
def test_create_ip_policy_network_ip_policy_already_exists(self):
|
||||||
|
with self._stubs(None, net=dict(id=1, ip_policy=dict(id=2))):
|
||||||
|
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
|
||||||
|
self.plugin.create_ip_policy(self.context, dict(
|
||||||
|
ip_policy=dict(network_id=1,
|
||||||
|
exclude=["1.1.1.1/24"])))
|
||||||
|
|
||||||
|
def test_create_ip_policy_subnet_ip_policy_already_exists(self):
|
||||||
|
with self._stubs(None, subnet=dict(id=1, ip_policy=dict(id=2))):
|
||||||
|
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
|
||||||
|
self.plugin.create_ip_policy(self.context, dict(
|
||||||
|
ip_policy=dict(subnet_id=1,
|
||||||
|
exclude=["1.1.1.1/24"])))
|
||||||
|
|
||||||
|
def test_create_ip_policy_network(self):
|
||||||
|
ipp = dict(subnet_id=None, network_id=1,
|
||||||
|
exclude=[dict(address=int(netaddr.IPAddress("1.1.1.1")),
|
||||||
|
prefix=24)])
|
||||||
|
with self._stubs(ipp, net=dict(id=1, ip_policy=dict(id=2))):
|
||||||
|
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
|
||||||
|
resp = self.plugin.create_ip_policy(self.context, dict(
|
||||||
|
ip_policy=dict(network_id=1,
|
||||||
|
exclude=["1.1.1.1/24"])))
|
||||||
|
self.assertEqual(len(resp.keys()), 3)
|
||||||
|
self.assertIsNone(resp["subnet_id"])
|
||||||
|
self.assertEqual(resp["network_id"], 1)
|
||||||
|
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
|
||||||
|
|
||||||
|
def test_create_ip_policy_subnet(self):
|
||||||
|
ipp = dict(subnet_id=1, network_id=None,
|
||||||
|
exclude=[dict(address=int(netaddr.IPAddress("1.1.1.1")),
|
||||||
|
prefix=24)])
|
||||||
|
with self._stubs(ipp, subnet=dict(id=1, ip_policy=dict(id=2))):
|
||||||
|
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
|
||||||
|
resp = self.plugin.create_ip_policy(self.context, dict(
|
||||||
|
ip_policy=dict(subnet_id=1,
|
||||||
|
exclude=["1.1.1.1/24"])))
|
||||||
|
self.assertEqual(len(resp.keys()), 3)
|
||||||
|
self.assertEqual(resp["subnet_id"], 1)
|
||||||
|
self.assertIsNone(resp["network_id"])
|
||||||
|
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
|
||||||
|
|
||||||
|
|
||||||
|
class TestQuarkDeleteIpPolicies(test_quark_plugin.TestQuarkPlugin):
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _stubs(self, ip_policy):
|
||||||
|
db_mod = "quark.db.api"
|
||||||
|
with contextlib.nested(
|
||||||
|
mock.patch("%s.ip_policy_find" % db_mod),
|
||||||
|
mock.patch("%s.ip_policy_delete" % db_mod),
|
||||||
|
) as (ip_policy_find, ip_policy_delete):
|
||||||
|
ip_policy_find.return_value = ip_policy
|
||||||
|
yield ip_policy_find, ip_policy_delete
|
||||||
|
|
||||||
|
def test_delete_ip_policy_not_found(self):
|
||||||
|
with self._stubs(None):
|
||||||
|
with self.assertRaises(quark_exceptions.IPPolicyNotFound):
|
||||||
|
self.plugin.delete_ip_policy(self.context, 1)
|
||||||
|
|
||||||
|
def test_delete_ip_policy(self):
|
||||||
|
address = int(netaddr.IPAddress("1.1.1.1"))
|
||||||
|
ip_policy = dict(
|
||||||
|
id=1,
|
||||||
|
subnet_id=1,
|
||||||
|
network_id=2,
|
||||||
|
exclude=[dict(address=address, prefix=24)])
|
||||||
|
with self._stubs(ip_policy) as (ip_policy_find, ip_policy_delete):
|
||||||
|
self.plugin.delete_ip_policy(self.context, 1)
|
||||||
|
self.assertEqual(ip_policy_find.call_count, 1)
|
||||||
|
self.assertEqual(ip_policy_delete.call_count, 1)
|
||||||
@@ -18,7 +18,6 @@ import uuid
|
|||||||
import contextlib
|
import contextlib
|
||||||
import copy
|
import copy
|
||||||
import mock
|
import mock
|
||||||
import netaddr
|
|
||||||
from neutron.api.v2 import attributes as neutron_attrs
|
from neutron.api.v2 import attributes as neutron_attrs
|
||||||
from neutron.common import exceptions
|
from neutron.common import exceptions
|
||||||
from neutron.db import api as db_api
|
from neutron.db import api as db_api
|
||||||
@@ -1739,161 +1738,3 @@ class TestQuarkGetIpAddresses(TestQuarkPlugin):
|
|||||||
with self._stubs(ips=None, ports=[port]):
|
with self._stubs(ips=None, ports=[port]):
|
||||||
with self.assertRaises(quark_exceptions.IpAddressNotFound):
|
with self.assertRaises(quark_exceptions.IpAddressNotFound):
|
||||||
self.plugin.get_ip_address(self.context, 1)
|
self.plugin.get_ip_address(self.context, 1)
|
||||||
|
|
||||||
|
|
||||||
class TestQuarkGetIpPolicies(TestQuarkPlugin):
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def _stubs(self, ip_policy):
|
|
||||||
db_mod = "quark.db.api"
|
|
||||||
with mock.patch("%s.ip_policy_find" % db_mod) as ip_policy_find:
|
|
||||||
ip_policy_find.return_value = ip_policy
|
|
||||||
yield
|
|
||||||
|
|
||||||
def test_get_ip_policy_not_found(self):
|
|
||||||
with self._stubs(None):
|
|
||||||
with self.assertRaises(quark_exceptions.IPPolicyNotFound):
|
|
||||||
self.plugin.get_ip_policy(self.context, 1)
|
|
||||||
|
|
||||||
def test_get_ip_policy(self):
|
|
||||||
address = int(netaddr.IPAddress("1.1.1.1"))
|
|
||||||
ip_policy = dict(
|
|
||||||
id=1,
|
|
||||||
subnet_id=1,
|
|
||||||
network_id=2,
|
|
||||||
exclude=[dict(address=address, prefix=24)])
|
|
||||||
with self._stubs(ip_policy):
|
|
||||||
resp = self.plugin.get_ip_policy(self.context, 1)
|
|
||||||
self.assertEqual(len(resp.keys()), 4)
|
|
||||||
self.assertEqual(resp["id"], 1)
|
|
||||||
self.assertEqual(resp["subnet_id"], 1)
|
|
||||||
self.assertEqual(resp["network_id"], 2)
|
|
||||||
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
|
|
||||||
|
|
||||||
def test_get_ip_policies(self):
|
|
||||||
address = int(netaddr.IPAddress("1.1.1.1"))
|
|
||||||
ip_policy = dict(
|
|
||||||
id=1,
|
|
||||||
subnet_id=1,
|
|
||||||
network_id=2,
|
|
||||||
exclude=[dict(address=address, prefix=24)])
|
|
||||||
with self._stubs([ip_policy]):
|
|
||||||
resp = self.plugin.get_ip_policies(self.context)
|
|
||||||
self.assertEqual(len(resp), 1)
|
|
||||||
resp = resp[0]
|
|
||||||
self.assertEqual(len(resp.keys()), 4)
|
|
||||||
self.assertEqual(resp["id"], 1)
|
|
||||||
self.assertEqual(resp["subnet_id"], 1)
|
|
||||||
self.assertEqual(resp["network_id"], 2)
|
|
||||||
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
|
|
||||||
|
|
||||||
|
|
||||||
class TestQuarkCreateIpPolicies(TestQuarkPlugin):
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def _stubs(self, ip_policy, subnet=None, net=None):
|
|
||||||
db_mod = "quark.db.api"
|
|
||||||
with contextlib.nested(
|
|
||||||
mock.patch("%s.subnet_find" % db_mod),
|
|
||||||
mock.patch("%s.network_find" % db_mod),
|
|
||||||
mock.patch("%s.ip_policy_create" % db_mod),
|
|
||||||
) as (subnet_find, net_find, ip_policy_create):
|
|
||||||
subnet_find.return_value = subnet
|
|
||||||
net_find.return_value = net
|
|
||||||
ip_policy_create.return_value = ip_policy
|
|
||||||
yield ip_policy_create
|
|
||||||
|
|
||||||
def test_create_ip_policy_invalid_body_missing_exclude(self):
|
|
||||||
with self._stubs(None):
|
|
||||||
with self.assertRaises(exceptions.BadRequest):
|
|
||||||
self.plugin.create_ip_policy(self.context, dict(
|
|
||||||
ip_policy=dict()))
|
|
||||||
|
|
||||||
def test_create_ip_policy_invalid_body_missing_netsubnet(self):
|
|
||||||
with self._stubs(None):
|
|
||||||
with self.assertRaises(exceptions.BadRequest):
|
|
||||||
self.plugin.create_ip_policy(self.context, dict(
|
|
||||||
ip_policy=dict(exclude=["1.1.1.1/24"])))
|
|
||||||
|
|
||||||
def test_create_ip_policy_invalid_subnet(self):
|
|
||||||
with self._stubs(None):
|
|
||||||
with self.assertRaises(exceptions.SubnetNotFound):
|
|
||||||
self.plugin.create_ip_policy(self.context, dict(
|
|
||||||
ip_policy=dict(subnet_id=1,
|
|
||||||
exclude=["1.1.1.1/24"])))
|
|
||||||
|
|
||||||
def test_create_ip_policy_invalid_network(self):
|
|
||||||
with self._stubs(None):
|
|
||||||
with self.assertRaises(exceptions.NetworkNotFound):
|
|
||||||
self.plugin.create_ip_policy(self.context, dict(
|
|
||||||
ip_policy=dict(network_id=1,
|
|
||||||
exclude=["1.1.1.1/24"])))
|
|
||||||
|
|
||||||
def test_create_ip_policy_network_ip_policy_already_exists(self):
|
|
||||||
with self._stubs(None, net=dict(id=1, ip_policy=dict(id=2))):
|
|
||||||
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
|
|
||||||
self.plugin.create_ip_policy(self.context, dict(
|
|
||||||
ip_policy=dict(network_id=1,
|
|
||||||
exclude=["1.1.1.1/24"])))
|
|
||||||
|
|
||||||
def test_create_ip_policy_subnet_ip_policy_already_exists(self):
|
|
||||||
with self._stubs(None, subnet=dict(id=1, ip_policy=dict(id=2))):
|
|
||||||
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
|
|
||||||
self.plugin.create_ip_policy(self.context, dict(
|
|
||||||
ip_policy=dict(subnet_id=1,
|
|
||||||
exclude=["1.1.1.1/24"])))
|
|
||||||
|
|
||||||
def test_create_ip_policy_network(self):
|
|
||||||
ipp = dict(subnet_id=None, network_id=1,
|
|
||||||
exclude=[dict(address=int(netaddr.IPAddress("1.1.1.1")),
|
|
||||||
prefix=24)])
|
|
||||||
with self._stubs(ipp, net=dict(id=1, ip_policy=dict(id=2))):
|
|
||||||
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
|
|
||||||
resp = self.plugin.create_ip_policy(self.context, dict(
|
|
||||||
ip_policy=dict(network_id=1,
|
|
||||||
exclude=["1.1.1.1/24"])))
|
|
||||||
self.assertEqual(len(resp.keys()), 3)
|
|
||||||
self.assertIsNone(resp["subnet_id"])
|
|
||||||
self.assertEqual(resp["network_id"], 1)
|
|
||||||
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
|
|
||||||
|
|
||||||
def test_create_ip_policy_subnet(self):
|
|
||||||
ipp = dict(subnet_id=1, network_id=None,
|
|
||||||
exclude=[dict(address=int(netaddr.IPAddress("1.1.1.1")),
|
|
||||||
prefix=24)])
|
|
||||||
with self._stubs(ipp, subnet=dict(id=1, ip_policy=dict(id=2))):
|
|
||||||
with self.assertRaises(quark_exceptions.IPPolicyAlreadyExists):
|
|
||||||
resp = self.plugin.create_ip_policy(self.context, dict(
|
|
||||||
ip_policy=dict(subnet_id=1,
|
|
||||||
exclude=["1.1.1.1/24"])))
|
|
||||||
self.assertEqual(len(resp.keys()), 3)
|
|
||||||
self.assertEqual(resp["subnet_id"], 1)
|
|
||||||
self.assertIsNone(resp["network_id"])
|
|
||||||
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
|
|
||||||
|
|
||||||
|
|
||||||
class TestQuarkDeleteIpPolicies(TestQuarkPlugin):
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def _stubs(self, ip_policy):
|
|
||||||
db_mod = "quark.db.api"
|
|
||||||
with contextlib.nested(
|
|
||||||
mock.patch("%s.ip_policy_find" % db_mod),
|
|
||||||
mock.patch("%s.ip_policy_delete" % db_mod),
|
|
||||||
) as (ip_policy_find, ip_policy_delete):
|
|
||||||
ip_policy_find.return_value = ip_policy
|
|
||||||
yield ip_policy_find, ip_policy_delete
|
|
||||||
|
|
||||||
def test_delete_ip_policy_not_found(self):
|
|
||||||
with self._stubs(None):
|
|
||||||
with self.assertRaises(quark_exceptions.IPPolicyNotFound):
|
|
||||||
self.plugin.delete_ip_policy(self.context, 1)
|
|
||||||
|
|
||||||
def test_delete_ip_policy(self):
|
|
||||||
address = int(netaddr.IPAddress("1.1.1.1"))
|
|
||||||
ip_policy = dict(
|
|
||||||
id=1,
|
|
||||||
subnet_id=1,
|
|
||||||
network_id=2,
|
|
||||||
exclude=[dict(address=address, prefix=24)])
|
|
||||||
with self._stubs(ip_policy) as (ip_policy_find, ip_policy_delete):
|
|
||||||
self.plugin.delete_ip_policy(self.context, 1)
|
|
||||||
self.assertEqual(ip_policy_find.call_count, 1)
|
|
||||||
self.assertEqual(ip_policy_delete.call_count, 1)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user