Add length property to range types

This commit is contained in:
Konsta Vesterinen
2015-05-22 11:30:59 +03:00
parent f7cb3832bd
commit f91ffb1ea6
7 changed files with 285 additions and 2 deletions

View File

@@ -4,6 +4,12 @@ Changelog
Here you can see the full list of changes between each SQLAlchemy-Utils release.
0.30.3 (2015-05-22)
^^^^^^^^^^^^^^^^^^^
- Added length property to range types
0.30.2 (2015-05-21)
^^^^^^^^^^^^^^^^^^^

View File

@@ -86,4 +86,4 @@ from .types import ( # noqa
WeekDaysType
)
__version__ = '0.30.2'
__version__ = '0.30.3'

View File

@@ -115,10 +115,27 @@ Membership operators
~ Car.price_range.in_([[300, 400], [700, 800]])
Length
^^^^^^
SQLAlchemy-Utils provides length property for all range types. The
implementation of this property varies on different range types.
In the following example we find all cars whose price range's length is more
than 500.
::
session.query(Car).filter(
Car.price_range.length > 500
)
.. _intervals: https://github.com/kvesteri/intervals
"""
from collections import Iterable
from datetime import timedelta
import six
import sqlalchemy as sa
@@ -206,6 +223,26 @@ class RangeComparator(types.TypeEngine.Comparator):
return self.op('<@')(other)
class DiscreteRangeComparator(RangeComparator):
@property
def length(self):
return sa.func.upper(self.expr) - self.step - sa.func.lower(self.expr)
class IntRangeComparator(DiscreteRangeComparator):
step = 1
class DateRangeComparator(DiscreteRangeComparator):
step = timedelta(days=1)
class ContinuousRangeComparator(RangeComparator):
@property
def length(self):
return sa.func.upper(self.expr) - sa.func.lower(self.expr)
funcs = [
'__eq__',
'__ne__',
@@ -312,6 +349,7 @@ class IntRangeType(RangeType):
# '30-140'
"""
impl = INT4RANGE
comparator_factory = IntRangeComparator
def __init__(self, *args, **kwargs):
super(IntRangeType, self).__init__(*args, **kwargs)
@@ -337,6 +375,7 @@ class DateRangeType(RangeType):
during = sa.Column(DateRangeType)
"""
impl = DATERANGE
comparator_factory = DateRangeComparator
def __init__(self, *args, **kwargs):
super(DateRangeType, self).__init__(*args, **kwargs)
@@ -363,6 +402,7 @@ class NumericRangeType(RangeType):
"""
impl = NUMRANGE
comparator_factory = ContinuousRangeComparator
def __init__(self, *args, **kwargs):
super(NumericRangeType, self).__init__(*args, **kwargs)
@@ -371,6 +411,7 @@ class NumericRangeType(RangeType):
class DateTimeRangeType(RangeType):
impl = TSRANGE
comparator_factory = ContinuousRangeComparator
def __init__(self, *args, **kwargs):
super(DateTimeRangeType, self).__init__(*args, **kwargs)

View File

@@ -0,0 +1,99 @@
from datetime import datetime, timedelta
import sqlalchemy as sa
from pytest import mark
from sqlalchemy_utils import DateRangeType
from tests import TestCase
intervals = None
try:
import intervals
from infinity import inf
except ImportError:
pass
@mark.skipif('intervals is None')
class DateRangeTestCase(TestCase):
def create_models(self):
class Booking(self.Base):
__tablename__ = 'booking'
id = sa.Column(sa.Integer, primary_key=True)
during = sa.Column(DateRangeType)
self.Booking = Booking
def create_booking(self, date_range):
booking = self.Booking(
during=date_range
)
self.session.add(booking)
self.session.commit()
return self.session.query(self.Booking).first()
def test_nullify_range(self):
booking = self.create_booking(None)
assert booking.during is None
@mark.parametrize(
('date_range'),
(
[datetime(2015, 1, 1).date(), datetime(2015, 1, 3).date()],
[datetime(2015, 1, 1).date(), inf],
[-inf, datetime(2015, 1, 1).date()]
)
)
def test_save_date_range(self, date_range):
booking = self.create_booking(date_range)
assert booking.during.lower == date_range[0]
assert booking.during.upper == date_range[1]
def test_nullify_date_range(self):
booking = self.Booking(
during=intervals.DateInterval(
[datetime(2015, 1, 1).date(), datetime(2015, 1, 3).date()]
)
)
self.session.add(booking)
self.session.commit()
booking = self.session.query(self.Booking).first()
booking.during = None
self.session.commit()
booking = self.session.query(self.Booking).first()
assert booking.during is None
def test_integer_coercion(self):
booking = self.Booking(during=datetime(2015, 1, 1).date())
assert booking.during.lower == datetime(2015, 1, 1).date()
assert booking.during.upper == datetime(2015, 1, 1).date()
class TestDateRangeOnPostgres(DateRangeTestCase):
dns = 'postgres://postgres@localhost/sqlalchemy_utils_test'
@mark.parametrize(
('date_range', 'length'),
(
(
[datetime(2015, 1, 1).date(), datetime(2015, 1, 3).date()],
timedelta(days=2)
),
(
[datetime(2015, 1, 1).date(), datetime(2015, 1, 1).date()],
timedelta(days=0)
),
([-inf, datetime(2015, 1, 1).date()], None),
([datetime(2015, 1, 1).date(), inf], None),
)
)
def test_length(self, date_range, length):
self.create_booking(date_range)
query = (
self.session.query(self.Booking.during.length)
)
assert query.scalar() == length

View File

@@ -0,0 +1,99 @@
from datetime import datetime, timedelta
import sqlalchemy as sa
from pytest import mark
from sqlalchemy_utils import DateTimeRangeType
from tests import TestCase
intervals = None
try:
import intervals
from infinity import inf
except ImportError:
pass
@mark.skipif('intervals is None')
class DateRangeTestCase(TestCase):
def create_models(self):
class Booking(self.Base):
__tablename__ = 'booking'
id = sa.Column(sa.Integer, primary_key=True)
during = sa.Column(DateTimeRangeType)
self.Booking = Booking
def create_booking(self, date_range):
booking = self.Booking(
during=date_range
)
self.session.add(booking)
self.session.commit()
return self.session.query(self.Booking).first()
def test_nullify_range(self):
booking = self.create_booking(None)
assert booking.during is None
@mark.parametrize(
('date_range'),
(
[datetime(2015, 1, 1), datetime(2015, 1, 3)],
[datetime(2015, 1, 1), inf],
[-inf, datetime(2015, 1, 1)]
)
)
def test_save_date_range(self, date_range):
booking = self.create_booking(date_range)
assert booking.during.lower == date_range[0]
assert booking.during.upper == date_range[1]
def test_nullify_date_range(self):
booking = self.Booking(
during=intervals.DateInterval(
[datetime(2015, 1, 1), datetime(2015, 1, 3)]
)
)
self.session.add(booking)
self.session.commit()
booking = self.session.query(self.Booking).first()
booking.during = None
self.session.commit()
booking = self.session.query(self.Booking).first()
assert booking.during is None
def test_integer_coercion(self):
booking = self.Booking(during=datetime(2015, 1, 1))
assert booking.during.lower == datetime(2015, 1, 1)
assert booking.during.upper == datetime(2015, 1, 1)
class TestDateRangeOnPostgres(DateRangeTestCase):
dns = 'postgres://postgres@localhost/sqlalchemy_utils_test'
@mark.parametrize(
('date_range', 'length'),
(
(
[datetime(2015, 1, 1), datetime(2015, 1, 3)],
timedelta(days=2)
),
(
[datetime(2015, 1, 1), datetime(2015, 1, 1)],
timedelta(days=0)
),
([-inf, datetime(2015, 1, 1)], None),
([datetime(2015, 1, 1), inf], None),
)
)
def test_length(self, date_range, length):
self.create_booking(date_range)
query = (
self.session.query(self.Booking.during.length)
)
assert query.scalar() == length

View File

@@ -113,6 +113,25 @@ class TestIntRangeTypeOnPostgres(NumberRangeTestCase):
)
assert query.count()
@mark.parametrize(
('number_range', 'length'),
(
([1, 3], 2),
([1, 1], 0),
([-1, 1], 2),
([-inf, 1], None),
([0, inf], None),
([0, 0], 0),
([-3, -1], 2)
)
)
def test_length(self, number_range, length):
self.create_building(number_range)
query = (
self.session.query(self.Building.persons_at_night.length)
)
assert query.scalar() == length
@mark.parametrize(
'number_range',
(

View File

@@ -61,7 +61,7 @@ class NumericRangeTestCase(TestCase):
def test_nullify_number_range(self):
car = self.Car(
price_range=intervals.IntInterval([1, 3])
price_range=intervals.DecimalInterval([1, 3])
)
self.session.add(car)
@@ -87,6 +87,25 @@ class NumericRangeTestCase(TestCase):
class TestNumericRangeOnPostgres(NumericRangeTestCase):
dns = 'postgres://postgres@localhost/sqlalchemy_utils_test'
@mark.parametrize(
('number_range', 'length'),
(
([1, 3], 2),
([1, 1], 0),
([-1, 1], 2),
([-inf, 1], None),
([0, inf], None),
([0, 0], 0),
([-3, -1], 2)
)
)
def test_length(self, number_range, length):
self.create_car(number_range)
query = (
self.session.query(self.Car.price_range.length)
)
assert query.scalar() == length
@mark.skipif('intervals is None')
class TestNumericRangeWithStep(TestCase):