Rajaram/Vinkesh | Switched to sqlite for tests. Cleaned up migration scripts.

This commit is contained in:
Rajaram Mallya
2011-08-25 14:50:09 +05:30
parent 54c93268d0
commit cbd9041219
31 changed files with 55 additions and 338 deletions

View File

@@ -19,10 +19,8 @@
SQLAlchemy models for Melange data
"""
from datetime import timedelta
import netaddr
from netaddr import IPAddress
from netaddr import IPNetwork
from datetime import timedelta
from netaddr.strategy.ipv6 import ipv6_verbose
from openstack.common.utils import bool_from_string
@@ -300,14 +298,14 @@ class IpBlock(ModelBase):
@property
def broadcast(self):
return str(IPNetwork(self.cidr).broadcast)
return str(netaddr.IPNetwork(self.cidr).broadcast)
@property
def netmask(self):
return str(IPNetwork(self.cidr).netmask)
return str(netaddr.IPNetwork(self.cidr).netmask)
def is_ipv6(self):
return IPNetwork(self.cidr).version == 6
return netaddr.IPNetwork(self.cidr).version == 6
def subnets(self):
return IpBlock.find_all(parent_id=self.id).all()
@@ -373,7 +371,7 @@ class IpBlock(ModelBase):
unavailable_addresses = allocated_addresses + [self.gateway,
self.broadcast]
policy = self.policy()
for ip in IPNetwork(self.cidr):
for ip in netaddr.IPNetwork(self.cidr):
if (IpBlock.allowed_by_policy(self, policy, str(ip))
and (str(ip) not in unavailable_addresses)):
return str(ip)
@@ -395,11 +393,11 @@ class IpBlock(ModelBase):
_("Block policy does not allow this address"))
def contains(self, address):
return netaddr.IPAddress(address) in IPNetwork(self.cidr)
return netaddr.IPAddress(address) in netaddr.IPNetwork(self.cidr)
def _overlaps(self, other_block):
network = IPNetwork(self.cidr)
other_network = IPNetwork(other_block.cidr)
network = netaddr.IPNetwork(self.cidr)
other_network = netaddr.IPNetwork(other_block.cidr)
return network in other_network or other_network in network
def find_allocated_ip(self, address):
@@ -434,14 +432,15 @@ class IpBlock(ModelBase):
def _has_valid_cidr(self):
try:
IPNetwork(self.cidr)
netaddr.IPNetwork(self.cidr)
return True
except Exception:
return False
def _validate_cidr_is_within_parent_block_cidr(self):
parent = self.parent
if parent and IPNetwork(self.cidr) not in IPNetwork(parent.cidr):
if (parent and netaddr.IPNetwork(self.cidr) not in
netaddr.IPNetwork(parent.cidr)):
self._add_error('cidr',
_("cidr should be within parent block's cidr"))
@@ -527,13 +526,13 @@ class IpBlock(ModelBase):
def _convert_cidr_to_lowest_address(self):
if self._has_valid_cidr():
self.cidr = str(IPNetwork(self.cidr).cidr)
self.cidr = str(netaddr.IPNetwork(self.cidr).cidr)
def _before_validate(self):
self._convert_cidr_to_lowest_address()
def _before_save(self):
self.gateway = self.gateway or str(IPNetwork(self.cidr)[1])
self.gateway = self.gateway or str(netaddr.IPNetwork(self.cidr)[1])
self.dns1 = self.dns1 or Config.get("dns1")
self.dns2 = self.dns2 or Config.get("dns2")
@@ -551,7 +550,7 @@ class IpAddress(ModelBase):
@classmethod
def _formatted(cls, address):
return IPAddress(address).format(dialect=ipv6_verbose)
return netaddr.IPAddress(address).format(dialect=ipv6_verbose)
@classmethod
def find_all_by_network(cls, network_id, **conditions):
@@ -599,7 +598,7 @@ class IpAddress(ModelBase):
@property
def version(self):
return IPAddress(self.address).version
return netaddr.IPAddress(self.address).version
def data(self, **options):
data = super(IpAddress, self).data(**options)
@@ -665,7 +664,8 @@ class IpRange(ModelBase):
and end_index >= 0)
if end_index_overshoots_length_for_negative_offset:
end_index = None
return IPAddress(address) in IPNetwork(cidr)[self.offset:end_index]
return (netaddr.IPAddress(address) in
netaddr.IPNetwork(cidr)[self.offset:end_index])
def _validate(self):
self._validate_positive_integer('length')
@@ -681,7 +681,7 @@ class IpOctet(ModelBase):
return cls.find_all(policy_id=policy_id)
def applies_to(self, address):
return self.octet == IPAddress(address).words[-1]
return self.octet == netaddr.IPAddress(address).words[-1]
class Network(ModelBase):