Remove batch fetch tests

This commit is contained in:
Konsta Vesterinen
2015-05-05 13:57:50 +03:00
parent 7b4817d2cd
commit e3944050f8
9 changed files with 0 additions and 887 deletions

View File

@@ -1,213 +0,0 @@
import sqlalchemy as sa
from sqlalchemy_utils import batch_fetch
from tests import TestCase
class TestCompoundOneToManyBatchFetching(TestCase):
def create_models(self):
class Building(self.Base):
__tablename__ = 'building'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
class BusinessPremise(self.Base):
__tablename__ = 'business_premise'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
building_id = sa.Column(sa.Integer, sa.ForeignKey(Building.id))
building = sa.orm.relationship(
Building,
backref=sa.orm.backref(
'business_premises'
)
)
class Equipment(self.Base):
__tablename__ = 'equipment'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
building_id = sa.Column(sa.Integer, sa.ForeignKey(Building.id))
business_premise_id = sa.Column(
sa.Integer, sa.ForeignKey(BusinessPremise.id)
)
building = sa.orm.relationship(
Building,
backref=sa.orm.backref(
'equipment'
)
)
business_premise = sa.orm.relationship(
BusinessPremise,
backref=sa.orm.backref(
'equipment'
)
)
self.Building = Building
self.BusinessPremise = BusinessPremise
self.Equipment = Equipment
def setup_method(self, method):
TestCase.setup_method(self, method)
self.buildings = [
self.Building(id=12, name=u'B 1'),
self.Building(id=15, name=u'B 2'),
self.Building(id=19, name=u'B 3'),
]
self.business_premises = [
self.BusinessPremise(
id=22, name=u'BP 1', building=self.buildings[0]
),
self.BusinessPremise(
id=33, name=u'BP 2', building=self.buildings[0]
),
self.BusinessPremise(
id=44, name=u'BP 3', building=self.buildings[2]
),
]
self.equipment = [
self.Equipment(
id=2, name=u'E 1', building=self.buildings[0]
),
self.Equipment(
id=4, name=u'E 2', building=self.buildings[2]
),
self.Equipment(
id=6, name=u'E 3', business_premise=self.business_premises[0]
),
self.Equipment(
id=8, name=u'E 4', business_premise=self.business_premises[2]
),
]
self.session.add_all(self.buildings)
self.session.add_all(self.business_premises)
self.session.add_all(self.equipment)
self.session.commit()
def test_compound_fetching(self):
buildings = self.session.query(self.Building).all()
batch_fetch(
buildings,
'business_premises',
(
'equipment',
'business_premises.equipment'
)
)
query_count = self.connection.query_count
assert len(buildings[0].equipment) == 1
assert buildings[0].equipment[0].name == 'E 1'
assert not buildings[1].equipment
assert buildings[0].business_premises[0].equipment
assert self.business_premises[2].equipment
assert self.business_premises[2].equipment[0].name == 'E 4'
assert self.connection.query_count == query_count
class TestCompoundManyToOneBatchFetching(TestCase):
def create_models(self):
class Equipment(self.Base):
__tablename__ = 'equipment'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
class Building(self.Base):
__tablename__ = 'building'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
equipment_id = sa.Column(sa.Integer, sa.ForeignKey(Equipment.id))
equipment = sa.orm.relationship(Equipment)
class BusinessPremise(self.Base):
__tablename__ = 'business_premise'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
building_id = sa.Column(sa.Integer, sa.ForeignKey(Building.id))
building = sa.orm.relationship(
Building,
backref=sa.orm.backref(
'business_premises'
)
)
equipment_id = sa.Column(sa.Integer, sa.ForeignKey(Equipment.id))
equipment = sa.orm.relationship(Equipment)
self.Building = Building
self.BusinessPremise = BusinessPremise
self.Equipment = Equipment
def setup_method(self, method):
TestCase.setup_method(self, method)
self.equipment = [
self.Equipment(
id=2, name=u'E 1',
),
self.Equipment(
id=4, name=u'E 2',
),
self.Equipment(
id=6, name=u'E 3',
),
self.Equipment(
id=8, name=u'E 4',
),
]
self.buildings = [
self.Building(id=12, name=u'B 1', equipment=self.equipment[0]),
self.Building(id=15, name=u'B 2', equipment=self.equipment[1]),
self.Building(id=19, name=u'B 3'),
]
self.business_premises = [
self.BusinessPremise(
id=22,
name=u'BP 1',
building=self.buildings[0]
),
self.BusinessPremise(
id=33,
name=u'BP 2',
building=self.buildings[0],
equipment=self.equipment[2]
),
self.BusinessPremise(
id=44,
name=u'BP 3',
building=self.buildings[2],
equipment=self.equipment[1]
),
]
self.session.add_all(self.buildings)
self.session.add_all(self.business_premises)
self.session.add_all(self.equipment)
self.session.commit()
def test_compound_fetching(self):
buildings = self.session.query(self.Building).all()
batch_fetch(
buildings,
'business_premises',
(
'equipment',
'business_premises.equipment'
)
)
query_count = self.connection.query_count
assert buildings[0].equipment.name == 'E 1'
assert buildings[1].equipment.name == 'E 2'
assert not buildings[2].equipment
assert not buildings[1].business_premises
assert buildings[2].business_premises[0].equipment.name == 'E 2'
assert self.connection.query_count == query_count

View File

@@ -1,135 +0,0 @@
import sqlalchemy as sa
from sqlalchemy_utils import batch_fetch, with_backrefs
from tests import TestCase
class TestBatchFetchDeepRelationships(TestCase):
def create_models(self):
class User(self.Base):
__tablename__ = 'user'
id = sa.Column(sa.Integer, autoincrement=True, primary_key=True)
name = sa.Column(sa.Unicode(255))
class Category(self.Base):
__tablename__ = 'category'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
class Article(self.Base):
__tablename__ = 'article'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
category_id = sa.Column(sa.Integer, sa.ForeignKey(Category.id))
category = sa.orm.relationship(
Category,
primaryjoin=category_id == Category.id,
backref=sa.orm.backref(
'articles'
)
)
article_tag = sa.Table(
'article_tag',
self.Base.metadata,
sa.Column(
'article_id',
sa.Integer,
sa.ForeignKey('article.id', ondelete='cascade')
),
sa.Column(
'tag_id',
sa.Integer,
sa.ForeignKey('tag.id', ondelete='cascade')
)
)
class Tag(self.Base):
__tablename__ = 'tag'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
articles = sa.orm.relationship(
Article,
secondary=article_tag,
backref=sa.orm.backref(
'tags'
)
)
self.User = User
self.Category = Category
self.Article = Article
self.Tag = Tag
def init_data(self):
articles = [
self.Article(name=u'Article 1'),
self.Article(name=u'Article 2'),
self.Article(name=u'Article 3'),
self.Article(name=u'Article 4'),
self.Article(name=u'Article 5')
]
self.session.add_all(articles)
self.session.flush()
tags = [
self.Tag(name=u'Tag 1'),
self.Tag(name=u'Tag 2'),
self.Tag(name=u'Tag 3')
]
articles[0].tags = tags
articles[3].tags = tags[1:]
category = self.Category(name=u'Category #1')
category.articles = articles[0:2]
category2 = self.Category(name=u'Category #2')
category2.articles = articles[2:]
self.session.add(category)
self.session.add(category2)
self.session.commit()
def test_supports_empty_related_entities(self):
category = self.Category(name=u'Category #1')
self.session.add(category)
self.session.commit()
categories = self.session.query(self.Category).all()
batch_fetch(
categories,
'articles',
'articles.tags'
)
query_count = self.connection.query_count
assert not categories[0].articles
assert self.connection.query_count == query_count
def test_deep_relationships(self):
self.init_data()
categories = self.session.query(self.Category).all()
batch_fetch(
categories,
'articles',
'articles.tags'
)
query_count = self.connection.query_count
categories[0].articles[0].tags
assert self.connection.query_count == query_count
categories[1].articles[1].tags
assert self.connection.query_count == query_count
def test_many_to_many_backref_population(self):
self.init_data()
categories = self.session.query(self.Category).all()
batch_fetch(
categories,
'articles',
with_backrefs('articles.tags'),
)
query_count = self.connection.query_count
tags = categories[0].articles[0].tags
tags2 = categories[1].articles[1].tags
tags[0].articles
tags2[0].articles
names = [article.name for article in tags[0].articles]
assert u'Article 1' in names
assert self.connection.query_count == query_count

View File

@@ -1,46 +0,0 @@
from __future__ import unicode_literals
import sqlalchemy as sa
from sqlalchemy_utils import batch_fetch, generic_relationship
from tests import TestCase
class TestBatchFetchGenericRelationship(TestCase):
def create_models(self):
class Building(self.Base):
__tablename__ = 'building'
id = sa.Column(sa.Integer, primary_key=True)
class User(self.Base):
__tablename__ = 'user'
id = sa.Column(sa.Integer, primary_key=True)
class Event(self.Base):
__tablename__ = 'event'
id = sa.Column(sa.Integer, primary_key=True)
object_type = sa.Column(sa.Unicode(255))
object_id = sa.Column(sa.Integer, nullable=False)
object = generic_relationship(object_type, object_id)
self.Building = Building
self.User = User
self.Event = Event
def test_batch_fetch(self):
user = self.User()
self.session.add(user)
self.session.commit()
event = self.Event(object=user)
self.session.add(event)
self.session.commit()
events = self.session.query(self.Event).all()
batch_fetch(events, 'object')
query_count = self.connection.query_count
events[0].object
assert self.connection.query_count == query_count

View File

@@ -1,163 +0,0 @@
import sqlalchemy as sa
from sqlalchemy_utils import batch_fetch
from tests import TestCase
class JoinTableInheritanceTestCase(TestCase):
def create_models(self):
class Category(self.Base):
__tablename__ = 'category'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
class User(self.Base):
__tablename__ = 'user'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
class TextItem(self.Base):
__tablename__ = 'text_item'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
category_id = sa.Column(
sa.Integer,
sa.ForeignKey(Category.id)
)
author_id = sa.Column(
sa.Integer,
sa.ForeignKey(User.id)
)
author = sa.orm.relationship(
User,
backref=sa.orm.backref(
'text_items'
)
)
type = sa.Column(sa.Unicode(255))
__mapper_args__ = {
'polymorphic_on': type
}
class Article(TextItem):
__tablename__ = 'article'
id = sa.Column(
sa.Integer, sa.ForeignKey(TextItem.id), primary_key=True
)
__mapper_args__ = {
'polymorphic_identity': u'article'
}
category = sa.orm.relationship(
Category,
backref=sa.orm.backref(
'articles'
)
)
class BlogPost(TextItem):
__tablename__ = 'blog_post'
id = sa.Column(
sa.Integer, sa.ForeignKey(TextItem.id), primary_key=True
)
__mapper_args__ = {
'polymorphic_identity': u'blog_post'
}
category = sa.orm.relationship(
Category,
backref=sa.orm.backref(
'blog_posts'
)
)
class Attachment(self.Base):
__tablename__ = 'attachment'
id = sa.Column(
sa.Integer, primary_key=True
)
name = sa.Column(
sa.Unicode(255), index=True
)
text_item_id = sa.Column(
sa.Integer,
sa.ForeignKey(TextItem.id),
)
text_item = sa.orm.relationship(TextItem, backref='attachments')
self.Article = Article
self.Attachment = Attachment
self.BlogPost = BlogPost
self.Category = Category
self.TextItem = TextItem
self.User = User
def setup_method(self, method):
TestCase.setup_method(self, method)
text_items = [
self.Article(name=u'Article 1'),
self.Article(name=u'Article 2'),
self.Article(name=u'Article 3'),
self.Article(name=u'Article 4'),
self.BlogPost(name=u'BlogPost 1'),
self.BlogPost(name=u'BlogPost 2'),
self.BlogPost(name=u'BlogPost 3'),
self.BlogPost(name=u'BlogPost 4')
]
self.session.add_all(text_items)
self.session.flush()
category = self.Category(name=u'Category #1')
category.articles = text_items[0:2]
category.blog_posts = text_items[4:7]
category2 = self.Category(name=u'Category #2')
category2.articles = text_items[2:4]
category2.blog_posts = text_items[-1:]
self.session.add(category)
self.session.add(category2)
text_items[0].attachments = [
self.Attachment(id=22, name=u'Attachment 1'),
self.Attachment(id=34, name=u'Attachment 2')
]
text_items[0].author = self.User(name=u'John Matrix')
text_items[1].author = self.User(name=u'John Doe')
self.session.commit()
class TestBatchFetchJoinTableInheritedModels(JoinTableInheritanceTestCase):
def test_multiple_relationships(self):
categories = self.session.query(self.Category).all()
batch_fetch(
categories,
'articles',
'blog_posts'
)
query_count = self.connection.query_count
categories[0].articles[1]
categories[0].blog_posts[0]
assert self.connection.query_count == query_count
categories[1].articles[1]
categories[1].blog_posts[0]
assert self.connection.query_count == query_count
def test_one_to_many_relationships(self):
articles = (
self.session.query(self.Article)
.filter_by(name=u'Article 1')
.all()
)
batch_fetch(
articles,
'attachments'
)
def test_many_to_one_relationships(self):
articles = (
self.session.query(self.Article)
.filter_by(name=u'Article 1')
.all()
)
batch_fetch(
articles,
'author'
)

View File

@@ -1,112 +0,0 @@
import sqlalchemy as sa
from sqlalchemy_utils import batch_fetch, with_backrefs
from tests import TestCase
class TestBatchFetchManyToManyCompositeRelationships(TestCase):
def create_models(self):
class Article(self.Base):
__tablename__ = 'article'
id1 = sa.Column(sa.Integer, primary_key=True)
id2 = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
article_tag = sa.Table(
'article_tag',
self.Base.metadata,
sa.Column(
'article_id1',
sa.Integer,
),
sa.Column(
'article_id2',
sa.Integer,
),
sa.Column(
'tag_id1',
sa.Integer,
),
sa.Column(
'tag_id2',
sa.Integer,
),
sa.ForeignKeyConstraint(
['article_id1', 'article_id2'],
['article.id1', 'article.id2']
),
sa.ForeignKeyConstraint(
['tag_id1', 'tag_id2'],
['tag.id1', 'tag.id2']
)
)
class Tag(self.Base):
__tablename__ = 'tag'
id1 = sa.Column(sa.Integer, primary_key=True)
id2 = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
articles = sa.orm.relationship(
Article,
secondary=article_tag,
backref=sa.orm.backref(
'tags',
),
)
self.Article = Article
self.Tag = Tag
def setup_method(self, method):
TestCase.setup_method(self, method)
articles = [
self.Article(id1=1, id2=2, name=u'Article 1'),
self.Article(id1=2, id2=2, name=u'Article 2'),
self.Article(id1=3, id2=3, name=u'Article 3'),
self.Article(id1=4, id2=3, name=u'Article 4'),
self.Article(id1=5, id2=3, name=u'Article 5')
]
self.session.add_all(articles)
self.session.flush()
tags = [
self.Tag(id1=1, id2=2, name=u'Tag 1'),
self.Tag(id1=2, id2=3, name=u'Tag 2'),
self.Tag(id1=3, id2=4, name=u'Tag 3')
]
articles[0].tags = tags
articles[3].tags = tags[1:]
self.session.commit()
def test_deep_relationships(self):
articles = (
self.session.query(self.Article)
.order_by(self.Article.id1).all()
)
batch_fetch(
articles,
'tags'
)
query_count = self.connection.query_count
assert articles[0].tags
articles[1].tags
assert articles[3].tags
assert self.connection.query_count == query_count
def test_many_to_many_backref_population(self):
articles = (
self.session.query(self.Article)
.order_by(self.Article.id1).all()
)
batch_fetch(
articles,
with_backrefs('tags'),
)
query_count = self.connection.query_count
tags = articles[0].tags
tags2 = articles[3].tags
tags[0].articles
tags2[0].articles
names = [article.name for article in tags[0].articles]
assert u'Article 1' in names
assert self.connection.query_count == query_count

View File

@@ -1,65 +0,0 @@
import sqlalchemy as sa
from sqlalchemy_utils import batch_fetch
from tests import TestCase
class TestBatchFetchManyToOneRelationships(TestCase):
def create_models(self):
class User(self.Base):
__tablename__ = 'user'
id = sa.Column(sa.Integer, autoincrement=True, primary_key=True)
name = sa.Column(sa.Unicode(255))
class Article(self.Base):
__tablename__ = 'article'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
author_id = sa.Column(sa.Integer, sa.ForeignKey(User.id))
author = sa.orm.relationship(
User,
backref=sa.orm.backref(
'articles'
)
)
self.User = User
self.Article = Article
def setup_method(self, method):
TestCase.setup_method(self, method)
self.users = [
self.User(id=333, name=u'John'),
self.User(id=334, name=u'Matt')
]
articles = [
self.Article(
id=1,
name=u'Article 1',
author=self.users[0]
),
self.Article(
id=2,
name=u'Article 2',
author=self.users[1]
),
self.Article(
id=3,
name=u'Article 3'
)
]
self.session.add_all(articles)
self.session.commit()
def test_supports_relationship_attributes(self):
articles = self.session.query(self.Article).all()
batch_fetch(
articles,
'author'
)
query_count = self.connection.query_count
assert articles[0].author == self.users[0] # no lazy load should occur
assert articles[1].author == self.users[1] # no lazy load should occur
assert articles[2].author is None # no lazy load should occur
assert self.connection.query_count == query_count

View File

@@ -1,74 +0,0 @@
import sqlalchemy as sa
from sqlalchemy_utils import batch_fetch
from tests import TestCase
class TestBatchFetchWithCompositeKeyRelationships(TestCase):
def create_models(self):
class User(self.Base):
__tablename__ = 'user'
first_name = sa.Column(sa.Unicode(255), primary_key=True)
last_name = sa.Column(sa.Unicode(255), primary_key=True)
class Article(self.Base):
__tablename__ = 'article'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
author_first_name = sa.Column(
sa.Unicode(255), sa.ForeignKey(User.first_name)
)
author_last_name = sa.Column(
sa.Unicode(255), sa.ForeignKey(User.last_name)
)
author = sa.orm.relationship(
User,
primaryjoin=sa.and_(
author_first_name == User.first_name,
author_last_name == User.last_name
),
backref=sa.orm.backref(
'articles'
)
)
self.User = User
self.Article = Article
def setup_method(self, method):
TestCase.setup_method(self, method)
self.users = [
self.User(first_name=u'John', last_name=u'Matrix'),
self.User(first_name=u'John', last_name=u'The Ripper')
]
articles = [
self.Article(
id=1,
name=u'Article 1',
author=self.users[0]
),
self.Article(
id=2,
name=u'Article 2',
author=self.users[1]
),
self.Article(
id=3,
name=u'Article 3'
)
]
self.session.add_all(articles)
self.session.commit()
def test_supports_relationship_attributes(self):
articles = self.session.query(self.Article).all()
batch_fetch(
articles,
'author'
)
query_count = self.connection.query_count
assert articles[0].author == self.users[0] # no lazy load should occur
assert articles[1].author == self.users[1] # no lazy load should occur
assert articles[2].author is None # no lazy load should occur
assert self.connection.query_count == query_count

View File

@@ -1,79 +0,0 @@
import sqlalchemy as sa
from pytest import raises
from sqlalchemy_utils import batch_fetch
from tests import TestCase
class TestBatchFetchOneToManyRelationships(TestCase):
def create_models(self):
class User(self.Base):
__tablename__ = 'user'
id = sa.Column(sa.Integer, autoincrement=True, primary_key=True)
name = sa.Column(sa.Unicode(255))
class Category(self.Base):
__tablename__ = 'category'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
class Article(self.Base):
__tablename__ = 'article'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
category_id = sa.Column(sa.Integer, sa.ForeignKey(Category.id))
category = sa.orm.relationship(
Category,
primaryjoin=category_id == Category.id,
backref=sa.orm.backref(
'articles'
)
)
self.User = User
self.Category = Category
self.Article = Article
def setup_method(self, method):
TestCase.setup_method(self, method)
articles = [
self.Article(name=u'Article 1'),
self.Article(name=u'Article 2'),
self.Article(name=u'Article 3'),
self.Article(name=u'Article 4'),
self.Article(name=u'Article 5')
]
self.session.add_all(articles)
self.session.flush()
category = self.Category(name=u'Category #1')
category.articles = articles[0:2]
category2 = self.Category(name=u'Category #2')
category2.articles = articles[2:]
self.session.add(category)
self.session.add(category2)
self.session.commit()
def test_raises_error_if_relationship_not_found(self):
categories = self.session.query(self.Category).all()
with raises(AttributeError):
batch_fetch(categories, 'unknown_relation')
def test_supports_relationship_attributes(self):
categories = self.session.query(self.Category).all()
batch_fetch(categories, self.Category.articles)
query_count = self.connection.query_count
articles = categories[0].articles # no lazy load should occur
assert len(articles) == 2
article_names = [article.name for article in articles]
assert 'Article 1' in article_names
assert 'Article 2' in article_names
articles = categories[1].articles # no lazy load should occur
assert len(articles) == 3
article_names = [article.name for article in articles]
assert 'Article 3' in article_names
assert 'Article 4' in article_names
assert 'Article 5' in article_names
assert self.connection.query_count == query_count