Adds more coverage to quark plugin modules

This commit is contained in:
Matt Dietz
2013-07-25 21:10:34 +00:00
parent bba3f4193e
commit 0fca3a3fea
10 changed files with 205 additions and 15 deletions

View File

@@ -13,8 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# Import neutron to ensure gettext is installed for "_" support from neutron.openstack.common.gettextutils import _
import neutron # noqa
from oslo.config import cfg from oslo.config import cfg
CONF = cfg.CONF CONF = cfg.CONF

View File

@@ -57,6 +57,10 @@ class BaseDriver(object):
LOG.info("Deleting security profile %s for tenant %s" % LOG.info("Deleting security profile %s for tenant %s" %
(group_id, context.tenant_id)) (group_id, context.tenant_id))
def update_security_group(self, context, group_id, **kwargs):
LOG.info("Updating security profile %s for tenant %s" %
(group_id, context.tenant_id))
def create_security_group_rule(self, context, group_id, rule): def create_security_group_rule(self, context, group_id, rule):
LOG.info("Creating security rule on group %s for tenant %s" % LOG.info("Creating security rule on group %s for tenant %s" %
(group_id, context.tenant_id)) (group_id, context.tenant_id))

View File

@@ -175,19 +175,15 @@ def post_update_port(context, id, port):
if "ip_id" in ip: if "ip_id" in ip:
ip_id = ip["ip_id"] ip_id = ip["ip_id"]
address = db_api.ip_address_find( address = db_api.ip_address_find(
context, context, id=ip_id, tenant_id=context.tenant_id,
id=ip_id,
tenant_id=context.tenant_id,
scope=db_api.ONE) scope=db_api.ONE)
elif "ip_address" in ip: elif "ip_address" in ip:
ip_address = ip["ip_address"] ip_address = ip["ip_address"]
net_address = netaddr.IPAddress(ip_address) net_address = netaddr.IPAddress(ip_address)
address = db_api.ip_address_find( address = db_api.ip_address_find(
context, context, ip_address=net_address,
ip_address=net_address,
network_id=port_db["network_id"], network_id=port_db["network_id"],
tenant_id=context.tenant_id, tenant_id=context.tenant_id, scope=db_api.ONE)
scope=db_api.ONE)
if not address: if not address:
address = ipam_driver.allocate_ip_address( address = ipam_driver.allocate_ip_address(
context, port_db["network_id"], id, context, port_db["network_id"], id,

View File

@@ -214,7 +214,7 @@ def get_security_group_rules(context, filters=None, fields=None,
page_reverse=False): page_reverse=False):
LOG.info("get_security_group_rules for tenant %s" % LOG.info("get_security_group_rules for tenant %s" %
(context.tenant_id)) (context.tenant_id))
rules = db_api.security_group_rule_find(context, filters=filters) rules = db_api.security_group_rule_find(context, **filters)
return [v._make_security_group_rule_dict(rule) for rule in rules] return [v._make_security_group_rule_dict(rule) for rule in rules]

View File

@@ -151,6 +151,19 @@ class TestQuarkCreateIpPolicies(test_quark_plugin.TestQuarkPlugin):
self.assertIsNone(resp["network_id"]) self.assertIsNone(resp["network_id"])
self.assertEqual(resp["exclude"], ["1.1.1.1/24"]) self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
def test_create_ip_policy(self):
ipp = dict(subnet_id=1, network_id=None, id=1,
exclude=[dict(address=int(netaddr.IPAddress("1.1.1.1")),
prefix=24)])
with self._stubs(ipp, subnet=dict(id=1, ip_policy=None)):
resp = self.plugin.create_ip_policy(self.context, dict(
ip_policy=dict(subnet_id=1,
exclude=["1.1.1.1/24"])))
self.assertEqual(len(resp.keys()), 4)
self.assertEqual(resp["subnet_id"], 1)
self.assertIsNone(resp["network_id"])
self.assertEqual(resp["exclude"], ["1.1.1.1/24"])
class TestQuarkDeleteIpPolicies(test_quark_plugin.TestQuarkPlugin): class TestQuarkDeleteIpPolicies(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager @contextlib.contextmanager

View File

@@ -213,3 +213,20 @@ class TestQuarkCreateNetwork(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(net["subnets"], []) self.assertEqual(net["subnets"], [])
self.assertEqual(net["shared"], False) self.assertEqual(net["shared"], False)
self.assertEqual(net["tenant_id"], 0) self.assertEqual(net["tenant_id"], 0)
def test_create_network_with_subnets(self):
subnet = dict(id=2, cidr="172.168.0.0/24", tenant_id=0)
net = dict(id=1, name="public", admin_state_up=True,
tenant_id=0)
with self._stubs(net=net, subnet=subnet) as net_create:
net.update(dict(subnets=[dict(subnet=subnet)]))
net = self.plugin.create_network(self.context, dict(network=net))
self.assertTrue(net_create.called)
self.assertEqual(len(net.keys()), 7)
self.assertIsNotNone(net["id"])
self.assertEqual(net["name"], "public")
self.assertIsNone(net["admin_state_up"])
self.assertEqual(net["status"], "ACTIVE")
self.assertEqual(net["subnets"], [2])
self.assertEqual(net["shared"], False)
self.assertEqual(net["tenant_id"], 0)

View File

@@ -352,13 +352,19 @@ class TestQuarkUpdatePort(test_quark_plugin.TestQuarkPlugin):
class TestQuarkPostUpdatePort(test_quark_plugin.TestQuarkPlugin): class TestQuarkPostUpdatePort(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager @contextlib.contextmanager
def _stubs(self, port, addr, addr2=None): def _stubs(self, port, addr, addr2=None, port_ips=None):
port_model = None port_model = None
addr_model = None addr_model = None
addr_model2 = None addr_model2 = None
if port: if port:
port_model = models.Port() port_model = models.Port()
port_model.update(port) port_model.update(port)
if port_ips:
port_model["ip_addresses"] = []
for ip in port_ips:
ip_mod = models.IPAddress()
ip_mod.update(ip)
port_model["ip_addresses"].append(ip_mod)
if addr: if addr:
addr_model = models.IPAddress() addr_model = models.IPAddress()
addr_model.update(addr) addr_model.update(addr)
@@ -452,6 +458,24 @@ class TestQuarkPostUpdatePort(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(response["fixed_ips"][0]["ip_address"], self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.101") "192.168.1.101")
def test_post_update_port_already_has_ip(self):
new_port_ip = dict(port=dict(fixed_ips=[dict()]))
port = dict(port=dict(network_id=1, tenant_id=self.context.tenant_id,
device_id=2))
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, deallocated=True)
port_ips = [ip]
with self._stubs(port=port, addr=ip, port_ips=port_ips) as (port_find,
alloc_ip,
ip_find):
response = self.plugin.post_update_port(self.context, 1,
new_port_ip)
self.assertEqual(port_find.call_count, 1)
self.assertEqual(alloc_ip.call_count, 1)
self.assertEqual(ip_find.call_count, 0)
self.assertEqual(response["fixed_ips"][0]["ip_address"],
"192.168.1.100")
class TestQuarkGetPortCount(test_quark_plugin.TestQuarkPlugin): class TestQuarkGetPortCount(test_quark_plugin.TestQuarkPlugin):
def test_get_port_count(self): def test_get_port_count(self):

View File

@@ -69,7 +69,7 @@ class TestQuarkCreateRoutes(test_quark_plugin.TestQuarkPlugin):
subnet = dict(id=2) subnet = dict(id=2)
create_route = dict(id=1, cidr="172.16.0.0/24", gateway="172.16.0.1", create_route = dict(id=1, cidr="172.16.0.0/24", gateway="172.16.0.1",
subnet_id=subnet["id"]) subnet_id=subnet["id"])
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1", route = dict(id=1, cidr="0.0.0.0/0", gateway="192.168.0.1",
subnet_id=subnet["id"]) subnet_id=subnet["id"])
with self._stubs(create_route=create_route, find_routes=[route], with self._stubs(create_route=create_route, find_routes=[route],
subnet=subnet): subnet=subnet):

View File

@@ -24,6 +24,128 @@ from quark.db import models
from quark.tests import test_quark_plugin from quark.tests import test_quark_plugin
class TestQuarkGetSecurityGroups(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, security_groups):
def _make_rules(rules):
rule_mods = []
for rule in rules:
r = models.SecurityGroupRule()
r.update(dict(id=rule))
rule_mods.append(r)
return rule_mods
if isinstance(security_groups, list):
for sg in security_groups:
sg["rules"] = _make_rules(sg["rules"])
elif security_groups:
security_groups["rules"] = _make_rules(security_groups["rules"])
with mock.patch("quark.db.api.security_group_find") as db_find:
db_find.return_value = security_groups
yield
def test_get_security_groups_list(self):
group = {"name": "foo", "description": "bar",
"tenant_id": self.context.tenant_id, "rules": [1]}
with self._stubs([group]):
result = self.plugin.get_security_groups(self.context, {})
group = result[0]
self.assertEqual("fake", group["tenant_id"])
self.assertEqual("foo", group["name"])
self.assertEqual("bar", group["description"])
rule = group["security_group_rules"][0]
self.assertEqual(1, rule)
def test_get_security_group(self):
group = {"name": "foo", "description": "bar",
"tenant_id": self.context.tenant_id, "rules": [1]}
with self._stubs(group):
result = self.plugin.get_security_group(self.context, 1)
self.assertEqual("fake", result["tenant_id"])
self.assertEqual("foo", result["name"])
self.assertEqual("bar", result["description"])
rule = result["security_group_rules"][0]
self.assertEqual(1, rule)
def test_get_security_group_group_not_found_fails(self):
with self._stubs(security_groups=None):
with self.assertRaises(sg_ext.SecurityGroupNotFound):
self.plugin.get_security_group(self.context, 1)
class TestQuarkGetSecurityGroupRules(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, security_rules):
if isinstance(security_rules, list):
rules = []
for rule in security_rules:
r = models.SecurityGroupRule()
r.update(rule)
rules.append(r)
elif security_rules is not None:
rules = models.SecurityGroupRule()
rules.update(security_rules)
with mock.patch("quark.db.api.security_group_rule_find") as db_find:
db_find.return_value = security_rules
yield
def test_get_security_group_rules(self):
rule = {"id": 1, "remote_group_id": 2, "direction": "ingress",
"port_range_min": 80, "port_range_max": 100,
"remote_ip_prefix": None, "ethertype": "IPv4",
"tenant_id": "foo", "protocol": "udp", "group_id": 1}
expected = rule.copy()
expected["security_group_id"] = expected.pop("group_id")
with self._stubs([rule]):
resp = self.plugin.get_security_group_rules(self.context, {})
for key in expected.keys():
self.assertTrue(key in resp[0])
self.assertEqual(resp[0][key], expected[key])
def test_get_security_group_rule(self):
rule = {"id": 1, "remote_group_id": 2, "direction": "ingress",
"port_range_min": 80, "port_range_max": 100,
"remote_ip_prefix": None, "ethertype": "IPv4",
"tenant_id": "foo", "protocol": "udp", "group_id": 1}
expected = rule.copy()
expected["security_group_id"] = expected.pop("group_id")
with self._stubs(rule):
resp = self.plugin.get_security_group_rule(self.context, 1)
for key in expected.keys():
self.assertTrue(key in resp)
self.assertEqual(resp[key], expected[key])
def test_get_security_group_rule_not_found(self):
with self._stubs(None):
with self.assertRaises(sg_ext.SecurityGroupRuleNotFound):
self.plugin.get_security_group_rule(self.context, 1)
class TestQuarkUpdateSecurityGroup(test_quark_plugin.TestQuarkPlugin):
def test_update_security_group(self):
rule = models.SecurityGroupRule()
rule.update(dict(id=1))
group = {"name": "foo", "description": "bar",
"tenant_id": self.context.tenant_id, "rules": [rule]}
updated_group = group.copy()
updated_group["name"] = "bar"
with contextlib.nested(
mock.patch("quark.db.api.security_group_find"),
mock.patch("quark.db.api.security_group_update"),
mock.patch(
"quark.drivers.base.BaseDriver.update_security_group")
) as (db_find, db_update, update_sg):
db_find.return_value = group
db_update.return_value = updated_group
update = dict(security_group=dict(name="bar"))
resp = self.plugin.update_security_group(self.context, 1, update)
self.assertEqual(resp["name"], updated_group["name"])
class TestQuarkCreateSecurityGroup(test_quark_plugin.TestQuarkPlugin): class TestQuarkCreateSecurityGroup(test_quark_plugin.TestQuarkPlugin):
def setUp(self, *args, **kwargs): def setUp(self, *args, **kwargs):
super(TestQuarkCreateSecurityGroup, self).setUp(*args, **kwargs) super(TestQuarkCreateSecurityGroup, self).setUp(*args, **kwargs)

View File

@@ -225,11 +225,22 @@ class TestQuarkCreateSubnetAllocationPools(test_quark_plugin.TestQuarkPlugin):
# * workaround is also in place for lame ATTR_NOT_SPECIFIED object() # * workaround is also in place for lame ATTR_NOT_SPECIFIED object()
class TestQuarkCreateSubnet(test_quark_plugin.TestQuarkPlugin): class TestQuarkCreateSubnet(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager @contextlib.contextmanager
def _stubs(self, subnet=None, network=None, routes=None, dns=None): def _stubs(self, subnet=None, network=None, routes=None, dns=None,
no_net_on_subnet_create=False):
if network:
net = models.Network()
net.update(network)
network = net
subnet_mod = models.Subnet(network=models.Network()) subnet_mod = models.Subnet(network=models.Network())
dns_ips = subnet.pop("dns_nameservers", []) dns_ips = subnet.pop("dns_nameservers", [])
host_routes = subnet.pop("host_routes", []) host_routes = subnet.pop("host_routes", [])
subnet_mod.update(subnet) subnet_mod.update(subnet)
#FIXME(anyone): coverage support for subnet->network backref hack
if no_net_on_subnet_create:
no_net_sub = models.Subnet()
no_net_sub.update(subnet)
subnet["dns_nameservers"] = dns_ips subnet["dns_nameservers"] = dns_ips
subnet["host_routes"] = host_routes subnet["host_routes"] = host_routes
routes = routes or [] routes = routes or []
@@ -243,7 +254,10 @@ class TestQuarkCreateSubnet(test_quark_plugin.TestQuarkPlugin):
mock.patch("quark.db.api.dns_create"), mock.patch("quark.db.api.dns_create"),
mock.patch("quark.db.api.route_create"), mock.patch("quark.db.api.route_create"),
) as (subnet_create, net_find, dns_create, route_create): ) as (subnet_create, net_find, dns_create, route_create):
subnet_create.return_value = subnet_mod if no_net_on_subnet_create:
subnet_create.return_value = no_net_sub
else:
subnet_create.return_value = subnet_mod
net_find.return_value = network net_find.return_value = network
route_create.side_effect = route_models route_create.side_effect = route_models
dns_create.side_effect = dns_models dns_create.side_effect = dns_models
@@ -363,7 +377,8 @@ class TestQuarkCreateSubnet(test_quark_plugin.TestQuarkPlugin):
with self._stubs( with self._stubs(
subnet=subnet["subnet"], subnet=subnet["subnet"],
network=network, network=network,
routes=routes routes=routes,
no_net_on_subnet_create=True
) as (subnet_create, dns_create, route_create): ) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers") dns_nameservers = subnet["subnet"].pop("dns_nameservers")
subnet_request = copy.deepcopy(subnet) subnet_request = copy.deepcopy(subnet)