Continues updating tests with new db_api calls
Adds several more tests to Quark utilizing the DB API as well as some more cleanup.
This commit is contained in:
@@ -30,39 +30,34 @@ ALL = "all"
|
||||
|
||||
|
||||
def _model_query(context, model, filters, query, fields=None):
|
||||
filters = filters or {}
|
||||
for key in ["name", "network_id", "id", "device_id", "tenant_id",
|
||||
"mac_address"]:
|
||||
if key in filters:
|
||||
if not filters[key]:
|
||||
continue
|
||||
listified = filters[key]
|
||||
if not isinstance(listified, list):
|
||||
listified = [listified]
|
||||
filters[key] = listified
|
||||
|
||||
if filters.get("name"):
|
||||
names = filters["name"]
|
||||
if not isinstance(filters["name"], list):
|
||||
names = [names]
|
||||
query = query.filter(
|
||||
model.name.in_(names))
|
||||
query = query.filter(model.name.in_(filters["name"]))
|
||||
|
||||
if filters.get("network_id"):
|
||||
ids = filters["network_id"]
|
||||
if not isinstance(filters["network_id"], list):
|
||||
ids = [ids]
|
||||
query = query.filter(
|
||||
model.network_id.in_(ids))
|
||||
query = query.filter(model.network_id.in_(filters["network_id"]))
|
||||
|
||||
if filters.get("device_id"):
|
||||
ids = filters["device_id"]
|
||||
if not isinstance(filters["device_id"], list):
|
||||
ids = [ids]
|
||||
query = query.filter(model.device_id.in_(ids))
|
||||
query = query.filter(model.device_id.in_(filters["device_id"]))
|
||||
|
||||
if filters.get("mac_address"):
|
||||
addrs = filters["mac_address"]
|
||||
if not isinstance(filters["mac_address"], list):
|
||||
addrs = [addrs]
|
||||
query = query.filter(
|
||||
model.mac_address.in_(addrs))
|
||||
query = query.filter(model.mac_address.in_(filters["mac_address"]))
|
||||
|
||||
if filters.get("tenant_id"):
|
||||
ids = filters["tenant_id"]
|
||||
if not isinstance(filters["tenant_id"], list):
|
||||
ids = [ids]
|
||||
query = query.filter(
|
||||
model.tenant_id.in_(ids))
|
||||
query = query.filter(model.tenant_id.in_(filters["tenant_id"]))
|
||||
|
||||
if filters.get("id"):
|
||||
query = query.filter(model.id.in_(filters["id"]))
|
||||
|
||||
if filters.get("reuse_after"):
|
||||
reuse_after = filters["reuse_after"]
|
||||
@@ -78,12 +73,6 @@ def _model_query(context, model, filters, query, fields=None):
|
||||
query = query.filter(model.deallocated ==
|
||||
filters["deallocated"])
|
||||
|
||||
if filters.get("id"):
|
||||
ids = filters["id"]
|
||||
if not isinstance(filters["id"], list):
|
||||
ids = [ids]
|
||||
query = query.filter(model.id.in_(ids))
|
||||
|
||||
if filters.get("order_by"):
|
||||
query = query.order_by(filters["order_by"])
|
||||
|
||||
@@ -122,13 +111,10 @@ def scoped(f):
|
||||
|
||||
|
||||
@scoped
|
||||
def port_find(context, fields=None, **filters):
|
||||
if "id" in filters and not isinstance(filters["id"], list):
|
||||
filters["id"] = [filters["id"]]
|
||||
def port_find(context, **filters):
|
||||
query = context.session.query(models.Port).\
|
||||
options(orm.joinedload(models.Port.ip_addresses))
|
||||
query = _model_query(context, models.Port, filters, fields=fields,
|
||||
query=query)
|
||||
query = _model_query(context, models.Port, filters, query=query)
|
||||
return query
|
||||
|
||||
|
||||
@@ -290,6 +276,7 @@ def subnet_delete(context, subnet):
|
||||
def subnet_create(context, **subnet_dict):
|
||||
subnet = models.Subnet()
|
||||
subnet.update(subnet_dict)
|
||||
subnet["tenant_id"] = context.tenant_id
|
||||
context.session.add(subnet)
|
||||
return subnet
|
||||
|
||||
|
@@ -36,10 +36,9 @@ class QuarkIpam(object):
|
||||
if version:
|
||||
filters["version"] = version
|
||||
subnets = db_api.subnet_find_allocation_counts(context, net_id,
|
||||
scope=db_api.ALL,
|
||||
**filters)
|
||||
for subnet, ips_in_subnet in subnets:
|
||||
if not subnet:
|
||||
break
|
||||
ipnet = netaddr.IPNetwork(subnet["cidr"])
|
||||
if ip_address and ip_address not in ipnet:
|
||||
continue
|
||||
|
@@ -115,13 +115,13 @@ class Plugin(quantum_plugin_base_v2.QuantumPluginBaseV2):
|
||||
# TODO(mdietz): this is a hack to get nova to boot. We want to get the
|
||||
# "default" route out of the database and use that
|
||||
gateway_ip = "0.0.0.0"
|
||||
subnet["allocation_pools"] = subnet.get("allocation_pools") or {}
|
||||
subnet["dns_nameservers"] = subnet.get("dns_nameservers") or {}
|
||||
return {"id": subnet.get('id'),
|
||||
"name": subnet.get('id'),
|
||||
"tenant_id": subnet.get('tenant_id'),
|
||||
"network_id": subnet.get('network_id'),
|
||||
"ip_version": subnet.get('ip_version'),
|
||||
"allocation_pools": subnet.get("allocation_pools") or [],
|
||||
"dns_nameservers": subnet.get("dns_nameservers") or [],
|
||||
"cidr": subnet.get('cidr'),
|
||||
"enable_dhcp": subnet.get('enable_dhcp'),
|
||||
"gateway_ip": gateway_ip}
|
||||
@@ -129,19 +129,6 @@ class Plugin(quantum_plugin_base_v2.QuantumPluginBaseV2):
|
||||
def _make_subnet_dict(self, subnet, fields=None):
|
||||
res = self._subnet_dict(subnet, fields)
|
||||
res["routes"] = [self._make_route_dict(r) for r in subnet["routes"]]
|
||||
#'dns_nameservers': [dns.get('address')
|
||||
# for dns in subnet.get('dns_nameservers')],
|
||||
#'host_routes': [{'destination': route.get('destination'),
|
||||
# 'nexthop': route.get('nexthop')}
|
||||
# for route in subnet.get('routes', [])],
|
||||
#'shared': subnet.get('shared')
|
||||
#}
|
||||
#'allocation_pools': [{'start': pool.get('first_ip'),
|
||||
# 'end': pool.get('last_ip')}
|
||||
# for pool in subnet.get('allocation_pools')],
|
||||
#}
|
||||
#if subnet.get('gateway_ip'):
|
||||
# res['gateway_ip'] = subnet.get('gateway_ip')
|
||||
return res
|
||||
|
||||
def _port_dict(self, port, fields=None):
|
||||
@@ -179,17 +166,7 @@ class Plugin(quantum_plugin_base_v2.QuantumPluginBaseV2):
|
||||
port_dict["fixed_ips"] = [self._make_port_address_dict(addr)
|
||||
for addr in port.ip_addresses]
|
||||
ports.append(port_dict)
|
||||
LOG.critical(ports)
|
||||
return ports
|
||||
#for port_dict, addr_dict in query:
|
||||
# port_id = port_dict["id"]
|
||||
# if port_id not in ports:
|
||||
# ports[port_id] = self._port_dict(port_dict, fields)
|
||||
# ports[port_id]["fixed_ips"] = []
|
||||
# if addr_dict:
|
||||
# ports[port_id]["fixed_ips"].append(
|
||||
# self._make_port_address_dict(addr_dict))
|
||||
#return ports.values()
|
||||
|
||||
def _make_subnets_list(self, query, fields=None):
|
||||
subnets = []
|
||||
@@ -229,7 +206,11 @@ class Plugin(quantum_plugin_base_v2.QuantumPluginBaseV2):
|
||||
quantum/api/v2/attributes.py. All keys will be populated.
|
||||
"""
|
||||
LOG.info("create_subnet for tenant %s" % context.tenant_id)
|
||||
subnet["subnet"]["tenant_id"] = context.tenant_id
|
||||
net_id = subnet["subnet"]["network_id"]
|
||||
net = db_api.network_find(context, net_id, scope=db_api.ONE)
|
||||
if not net:
|
||||
raise exceptions.NetworkNotFound(net_id=net_id)
|
||||
|
||||
new_subnet = db_api.subnet_create(context, **subnet["subnet"])
|
||||
subnet_dict = self._make_subnet_dict(new_subnet)
|
||||
return subnet_dict
|
||||
@@ -582,7 +563,7 @@ class Plugin(quantum_plugin_base_v2.QuantumPluginBaseV2):
|
||||
"""
|
||||
LOG.info("get_ports for tenant %s filters %s fields %s" %
|
||||
(context.tenant_id, filters, fields))
|
||||
query = db_api.port_find(context, fields, **filters)
|
||||
query = db_api.port_find(context, fields, filters)
|
||||
return self._make_ports_list(query, fields)
|
||||
|
||||
def get_ports_count(self, context, filters=None):
|
||||
|
@@ -192,7 +192,8 @@ class QuarkNewIPAddressAllocation(QuarkIpamBaseTest):
|
||||
subnet = dict(id=1, first_ip=0, last_ip=255,
|
||||
cidr="0.0.0.0/24", ip_version=4)
|
||||
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
|
||||
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
|
||||
address = self.ipam.allocate_ip_address(self.context, 0, 0, 0,
|
||||
version=4)
|
||||
self.assertEqual(address["address"], 0)
|
||||
|
||||
def test_allocate_new_ip_in_partially_allocated_range(self):
|
||||
@@ -277,14 +278,13 @@ class QuarkIPAddressAllocateDeallocated(QuarkIpamBaseTest):
|
||||
addr_find.side_effect = [None, updated_address]
|
||||
addr_update.return_value = updated_address
|
||||
choose_subnet.return_value = subnet
|
||||
yield
|
||||
yield choose_subnet
|
||||
|
||||
def test_allocate_finds_deallocated_ip_succeeds(self):
|
||||
with self._stubs():
|
||||
with self._stubs() as choose_subnet:
|
||||
ipaddress = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
|
||||
self.assertIsNotNone(ipaddress['id'])
|
||||
self.assertFalse(
|
||||
quark.ipam.QuarkIpam._choose_available_subnet.called)
|
||||
self.assertFalse(choose_subnet.called)
|
||||
|
||||
def test_allocate_finds_no_deallocated_creates_new_ip(self):
|
||||
'''Fails based on the choice of reuse_after argument.
|
||||
@@ -292,8 +292,7 @@ class QuarkIPAddressAllocateDeallocated(QuarkIpamBaseTest):
|
||||
Allocates new ip address instead of previously deallocated mac
|
||||
address.
|
||||
'''
|
||||
with self._stubs(ip_find=False):
|
||||
with self._stubs(ip_find=False) as choose_subnet:
|
||||
ipaddress = self.ipam.allocate_ip_address(self.context, 0, 0, 0)
|
||||
self.assertIsNone(ipaddress['id'])
|
||||
self.assertTrue(
|
||||
quark.ipam.QuarkIpam._choose_available_subnet.called)
|
||||
self.assertTrue(choose_subnet.called)
|
||||
|
@@ -13,13 +13,17 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import netaddr
|
||||
import uuid
|
||||
|
||||
import contextlib
|
||||
import mock
|
||||
import netaddr
|
||||
from oslo.config import cfg
|
||||
from quantum.common import exceptions
|
||||
from quantum import context
|
||||
from quantum.db import api as db_api
|
||||
|
||||
from quark.db import models
|
||||
import quark.plugin
|
||||
|
||||
from quark.tests import test_base
|
||||
@@ -54,20 +58,108 @@ class TestQuarkPlugin(test_base.TestBase):
|
||||
db_api.clear_db()
|
||||
|
||||
|
||||
class TestSubnets(TestQuarkPlugin):
|
||||
def test_allocated_ips_only(self):
|
||||
network_id = self._create_network()['id']
|
||||
self._create_subnet(network_id)
|
||||
self._create_mac_address_range()
|
||||
class TestQuarkGetSubnets(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, subnets=None, routes=None):
|
||||
route_models = []
|
||||
for route in routes:
|
||||
r = models.Route()
|
||||
r.update(route)
|
||||
route_models.append(r)
|
||||
|
||||
port = self._create_port(network_id)
|
||||
self.assertTrue(len(port['fixed_ips']) >= 1)
|
||||
if isinstance(subnets, list):
|
||||
subnet_models = []
|
||||
for subnet in subnets:
|
||||
s_dict = subnet.copy()
|
||||
s_dict["routes"] = route_models
|
||||
s = models.Subnet()
|
||||
s.update(s_dict)
|
||||
subnet_models.append(s)
|
||||
else:
|
||||
mod = models.Subnet()
|
||||
mod.update(subnets)
|
||||
mod["routes"] = route_models
|
||||
subnet_models = mod
|
||||
|
||||
self.plugin.delete_port(self.context, port['id'])
|
||||
with mock.patch("quark.db.api.subnet_find") as subnet_find:
|
||||
subnet_find.return_value = subnet_models
|
||||
yield
|
||||
|
||||
# TODO(jkoelker) once the ip_addresses controller is in the api
|
||||
# grab the fixed_ip from that and make sure it has
|
||||
# no ports
|
||||
def test_subnets_list(self):
|
||||
subnet_id = str(uuid.uuid4())
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet_id)
|
||||
|
||||
subnet = dict(id=subnet_id, network_id=1, name=subnet_id,
|
||||
tenant_id=self.context.tenant_id, ip_version=4,
|
||||
cidr="172.16.0.0/24", gateway_ip="0.0.0.0",
|
||||
allocation_pools=[], dns_nameservers=[],
|
||||
enable_dhcp=True)
|
||||
|
||||
with self._stubs(subnets=[subnet], routes=[route]):
|
||||
res = self.plugin.get_subnets(self.context, {}, {})
|
||||
|
||||
# Compare routes separately
|
||||
routes = res[0].pop("routes")
|
||||
for key in subnet.keys():
|
||||
self.assertEqual(res[0][key], subnet[key])
|
||||
for key in route.keys():
|
||||
self.assertEqual(routes[0][key], route[key])
|
||||
|
||||
def test_subnet_show(self):
|
||||
subnet_id = str(uuid.uuid4())
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet_id)
|
||||
|
||||
subnet = dict(id=subnet_id, network_id=1, name=subnet_id,
|
||||
tenant_id=self.context.tenant_id, ip_version=4,
|
||||
cidr="172.16.0.0/24", gateway_ip="0.0.0.0",
|
||||
allocation_pools=[], dns_nameservers=[],
|
||||
enable_dhcp=True)
|
||||
|
||||
with self._stubs(subnets=subnet, routes=[route]):
|
||||
res = self.plugin.get_subnet(self.context, subnet_id)
|
||||
|
||||
# Compare routes separately
|
||||
routes = res.pop("routes")
|
||||
for key in subnet.keys():
|
||||
self.assertEqual(res[key], subnet[key])
|
||||
for key in route.keys():
|
||||
self.assertEqual(routes[0][key], route[key])
|
||||
|
||||
|
||||
class TestQuarkCreateSubnet(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, subnet=None, network=None):
|
||||
subnet_mod = models.Subnet()
|
||||
subnet_mod.update(subnet)
|
||||
with contextlib.nested(
|
||||
mock.patch("quark.db.api.subnet_create"),
|
||||
mock.patch("quark.db.api.network_find"),
|
||||
) as (subnet_create, net_find):
|
||||
subnet_create.return_value = subnet_mod
|
||||
net_find.return_value = network
|
||||
yield
|
||||
|
||||
def test_create_subnet(self):
|
||||
subnet = dict(
|
||||
subnet=dict(network_id=1,
|
||||
tenant_id=self.context.tenant_id, ip_version=4,
|
||||
cidr="172.16.0.0/24", gateway_ip="0.0.0.0",
|
||||
allocation_pools=[], dns_nameservers=[],
|
||||
enable_dhcp=None))
|
||||
network = dict(network_id=1)
|
||||
with self._stubs(subnet=subnet["subnet"], network=network):
|
||||
res = self.plugin.create_subnet(self.context, subnet)
|
||||
for key in subnet["subnet"].keys():
|
||||
print key
|
||||
self.assertEqual(res[key], subnet["subnet"][key])
|
||||
|
||||
def test_create_subnet_no_network_fails(self):
|
||||
subnet = dict(subnet=dict(network_id=1))
|
||||
with self._stubs(subnet=dict(), network=None):
|
||||
with self.assertRaises(exceptions.NetworkNotFound):
|
||||
self.plugin.create_subnet(self.context, subnet)
|
||||
|
||||
|
||||
class TestIpAddresses(TestQuarkPlugin):
|
||||
@@ -315,3 +407,167 @@ class TestIpAddresses(TestQuarkPlugin):
|
||||
response['id'],
|
||||
ip_address)
|
||||
self.assertEqual(response['port_ids'], [])
|
||||
|
||||
|
||||
class TestQuarkGetPorts(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, ports=None):
|
||||
port_models = []
|
||||
if isinstance(ports, list):
|
||||
for port in ports:
|
||||
port_model = models.Port()
|
||||
port_model.update(port)
|
||||
port_models.append(port_model)
|
||||
elif ports is None:
|
||||
port_models = None
|
||||
else:
|
||||
port_model = models.Port()
|
||||
port_model.update(ports)
|
||||
port_models = port_model
|
||||
|
||||
db_mod = "quark.db.api"
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.port_find" % db_mod)
|
||||
) as (port_find,):
|
||||
port_find.return_value = port_models
|
||||
yield
|
||||
|
||||
def test_port_list_no_ports(self):
|
||||
with self._stubs(ports=[]):
|
||||
ports = self.plugin.get_ports(self.context, filters=None,
|
||||
fields=None)
|
||||
self.assertEqual(ports, [])
|
||||
|
||||
def test_port_list_with_ports(self):
|
||||
port = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2)
|
||||
expected = {'status': None,
|
||||
'device_owner': None,
|
||||
'mac_address': 'AA:BB:CC:DD:EE:FF',
|
||||
'network_id': 1,
|
||||
'tenant_id': self.context.tenant_id,
|
||||
'admin_state_up': None,
|
||||
'fixed_ips': [],
|
||||
'device_id': 2}
|
||||
with self._stubs(ports=[port]):
|
||||
ports = self.plugin.get_ports(self.context, filters=None,
|
||||
fields=None)
|
||||
self.assertEqual(len(ports), 1)
|
||||
for key in expected.keys():
|
||||
self.assertEqual(ports[0][key], expected[key])
|
||||
|
||||
def test_port_show(self):
|
||||
port = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2)
|
||||
expected = {'status': None,
|
||||
'device_owner': None,
|
||||
'mac_address': 'AA:BB:CC:DD:EE:FF',
|
||||
'network_id': 1,
|
||||
'tenant_id': self.context.tenant_id,
|
||||
'admin_state_up': None,
|
||||
'fixed_ips': [],
|
||||
'device_id': 2}
|
||||
with self._stubs(ports=port):
|
||||
result = self.plugin.get_port(self.context, 1)
|
||||
for key in expected.keys():
|
||||
self.assertEqual(result[key], expected[key])
|
||||
|
||||
def test_port_show_not_found(self):
|
||||
with self._stubs(ports=None):
|
||||
with self.assertRaises(exceptions.PortNotFound):
|
||||
self.plugin.get_port(self.context, 1)
|
||||
|
||||
|
||||
class TestQuarkCreatePort(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, port=None, network=None, addr=None, mac=None):
|
||||
port_model = models.Port()
|
||||
port_model.update(port)
|
||||
port_models = port_model
|
||||
|
||||
db_mod = "quark.db.api"
|
||||
ipam = "quark.ipam.QuarkIpam"
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.port_create" % db_mod),
|
||||
mock.patch("%s.network_find" % db_mod),
|
||||
mock.patch("%s.allocate_ip_address" % ipam),
|
||||
mock.patch("%s.allocate_mac_address" % ipam),
|
||||
) as (port_create, net_find, alloc_ip, alloc_mac):
|
||||
port_create.return_value = port_models
|
||||
net_find.return_value = network
|
||||
alloc_ip.return_value = addr
|
||||
alloc_mac.return_value = mac
|
||||
yield port_create
|
||||
|
||||
def test_create_port(self):
|
||||
network = dict(id=1)
|
||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||
ip = dict()
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2))
|
||||
expected = {'status': None,
|
||||
'device_owner': None,
|
||||
'mac_address': mac["address"],
|
||||
'network_id': network["id"],
|
||||
'tenant_id': self.context.tenant_id,
|
||||
'admin_state_up': None,
|
||||
'fixed_ips': [],
|
||||
'device_id': 2}
|
||||
with self._stubs(port=port["port"], network=network, addr=ip,
|
||||
mac=mac) as port_create:
|
||||
result = self.plugin.create_port(self.context, port)
|
||||
self.assertTrue(port_create.called)
|
||||
for key in expected.keys():
|
||||
self.assertEqual(result[key], expected[key])
|
||||
|
||||
def test_create_port_no_network_found(self):
|
||||
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
|
||||
device_id=2))
|
||||
with self._stubs(network=None, port=port["port"]):
|
||||
with self.assertRaises(exceptions.NetworkNotFound):
|
||||
self.plugin.create_port(self.context, port)
|
||||
|
||||
|
||||
class TestQuarkUpdatePort(TestQuarkPlugin):
|
||||
def test_update_not_implemented(self):
|
||||
with self.assertRaises(NotImplementedError):
|
||||
self.plugin.update_port(self.context, 1, {})
|
||||
|
||||
|
||||
class TestQuarkDeletePort(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, port=None, addr=None, mac=None):
|
||||
port_models = None
|
||||
if port:
|
||||
port_model = models.Port()
|
||||
port_model.update(port)
|
||||
port_models = port_model
|
||||
|
||||
db_mod = "quark.db.api"
|
||||
ipam = "quark.ipam.QuarkIpam"
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.port_find" % db_mod),
|
||||
mock.patch("%s.deallocate_ip_address" % ipam),
|
||||
mock.patch("%s.deallocate_mac_address" % ipam),
|
||||
mock.patch("%s.port_delete" % db_mod),
|
||||
mock.patch("quark.drivers.base.BaseDriver.delete_port")
|
||||
) as (port_find, dealloc_ip, dealloc_mac, db_port_del,
|
||||
driver_port_del):
|
||||
port_find.return_value = port_models
|
||||
dealloc_ip.return_value = addr
|
||||
dealloc_mac.return_value = mac
|
||||
yield db_port_del, driver_port_del
|
||||
|
||||
def test_port_delete(self):
|
||||
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
|
||||
device_id=2, mac_address="AA:BB:CC:DD:EE:FF",
|
||||
backend_key="foo"))
|
||||
with self._stubs(port=port["port"]) as (db_port_del, driver_port_del):
|
||||
self.plugin.delete_port(self.context, 1)
|
||||
self.assertTrue(db_port_del.called)
|
||||
driver_port_del.assert_called_with(self.context, "foo")
|
||||
|
||||
def test_port_delete_port_not_found_fails(self):
|
||||
with self._stubs(port=None) as (db_port_del, driver_port_del):
|
||||
with self.assertRaises(exceptions.PortNotFound):
|
||||
self.plugin.delete_port(self.context, 1)
|
||||
|
2
tox.ini
2
tox.ini
@@ -30,7 +30,7 @@ setenv = VIRTUAL_ENV={envdir}
|
||||
NOSE_COVER_HTML=1
|
||||
NOSE_COVER_HTML_DIR=.cover-report
|
||||
NOSE_COVER_MIN_PERCENTAGE=70
|
||||
commands = nosetests --cover-package=quark {posargs}
|
||||
commands = nosetests --cover-package=quark --cover-erase {posargs}
|
||||
|
||||
[testenv:venv]
|
||||
commands = {posargs}
|
||||
|
Reference in New Issue
Block a user