Add custom SQLAlchemy type for CIDR

Change-Id: I8e67012179d9cc6d27a2fdf47477857609c4c856
This commit is contained in:
Artur Korzeniewski 2016-03-15 14:55:38 +01:00
parent 8785edc0e7
commit 4f995c3cb0
3 changed files with 74 additions and 0 deletions

View File

@ -31,3 +31,19 @@ class IPAddress(types.TypeDecorator):
'value': value})
return str(value)
class CIDR(types.TypeDecorator):
impl = types.String(64)
def process_result_value(self, value, dialect):
return netaddr.IPNetwork(value)
def process_bind_param(self, value, dialect):
if not isinstance(value, netaddr.IPNetwork):
raise AttributeError(_("Received type '%(type)s' and value "
"'%(value)s'. Expecting netaddr.IPNetwork "
"type.") % {'type': type(value),
'value': value})
return str(value)

View File

@ -197,6 +197,14 @@ def get_random_integer(range_begin=0, range_end=1000):
return random.randint(range_begin, range_end)
def get_random_cidr(version=4):
if version == 4:
return '10.%d.%d.0/%d' % (random.randint(3, 254),
random.randint(3, 254),
24)
return '2001:db8:%x::/&d' % (random.getrandbits(16), 64)
def is_bsd():
"""Return True on BSD-based systems."""

View File

@ -15,11 +15,13 @@ import netaddr
from oslo_db import exception
from oslo_db.sqlalchemy import test_base
from oslo_utils import uuidutils
import six
import sqlalchemy as sa
from neutron import context
from neutron.db import sqlalchemytypes
from neutron.tests import tools
@six.add_metaclass(abc.ABCMeta)
@ -127,3 +129,51 @@ class IPAddressTestCase(SqlAlchemyTypesBaseTestCase):
'ip': "2120::ffff:ffff:ffff:ffff"}
]
self._test_multiple_create(ip_addresses)
class CIDRTestCase(SqlAlchemyTypesBaseTestCase):
def _get_test_table(self, meta):
return sa.Table(
'fakecidrmodels',
meta,
sa.Column('id', sa.String(36), primary_key=True, nullable=False),
sa.Column('cidr', sqlalchemytypes.CIDR)
)
def _get_one(self, value):
row_select = self.test_table.select().\
where(self.test_table.c.cidr == value)
return self.engine.execute(row_select).first()
def _update_row(self, key, cidr):
self.engine.execute(
self.test_table.update().values(cidr=cidr).
where(self.test_table.c.cidr == key))
def test_crud(self):
cidrs = ["10.0.0.0/24", "10.123.250.9/32", "2001:db8::/42",
"fe80::21e:67ff:fed0:56f0/64"]
for cidr_str in cidrs:
cidr = netaddr.IPNetwork(cidr_str)
self._add_row(id=uuidutils.generate_uuid(), cidr=cidr)
obj = self._get_one(cidr)
self.assertEqual(cidr, obj['cidr'])
random_cidr = netaddr.IPNetwork(tools.get_random_cidr())
self._update_row(cidr, random_cidr)
obj = self._get_one(random_cidr)
self.assertEqual(random_cidr, obj['cidr'])
objs = self._get_all()
self.assertEqual(len(cidrs), len(objs))
self._delete_rows()
objs = self._get_all()
self.assertEqual(0, len(objs))
def test_wrong_cidr(self):
wrong_cidrs = ["10.500.5.0/24", "10.0.0.1/40", "10.0.0.10.0/24",
"cidr", "", '2001:db8:5000::/64', '2001:db8::/130']
for cidr in wrong_cidrs:
self.assertRaises(exception.DBError, self._add_row,
id=uuidutils.generate_uuid(), cidr=cidr)