from pytest import raises import sqlalchemy as sa import sqlalchemy.ext.associationproxy from sqlalchemy.ext.associationproxy import ( AssociationProxy, _AssociationDict ) from sqlalchemy.orm.collections import ( attribute_mapped_collection, collection, MappedCollection, ) from sqlalchemy_utils import MetaType, MetaValue from tests import TestCase class TestMetaModel(TestCase): def create_models(self): class Question(self.Base): __tablename__ = 'question' id = sa.Column(sa.Integer, primary_key=True) data_type = sa.Column( MetaType({ 'str': sa.String, 'unicode': sa.UnicodeText, 'int': sa.Integer, 'datetime': sa.DateTime }) ) class Answer(self.Base): __tablename__ = 'answer' id = sa.Column(sa.Integer, primary_key=True) value = MetaValue('question', 'data_type') question_id = sa.Column(sa.Integer, sa.ForeignKey(Question.id)) question = sa.orm.relationship(Question) self.Question = Question self.Answer = Answer def test_meta_type_conversion(self): question = self.Question(data_type=sa.String(200)) self.session.add(question) self.session.commit() self.session.refresh(question) assert question.data_type.__name__ == 'String' def test_auto_generates_meta_value_columns(self): assert hasattr(self.Answer, 'value_str') assert hasattr(self.Answer, 'value_int') assert hasattr(self.Answer, 'value_datetime') def test_meta_value_setting(self): question = self.Question(data_type=sa.String) answer = self.Answer(question=question) answer.value = 'some answer' assert answer.value == answer.value_str == 'some answer' class MetaTypedCollection(MappedCollection): def __init__(self): self.keyfunc = lambda value: value.attr.name def __getitem__(self, key): if not self.key_exists(key): raise KeyError(key) return self.get(key) def key_exists(self, key): adapter = self._sa_adapter obj = adapter.owner_state.object return obj.category and key in obj.category.attributes @collection.appender @collection.internally_instrumented def set(self, *args, **kwargs): if len(args) > 1: if not self.key_exists(args[0]): raise KeyError(args[0]) arg = args[1] else: arg = args[0] super(MetaTypedCollection, self).set(arg, **kwargs) @collection.remover @collection.internally_instrumented def remove(self, key): del self[key] def assoc_dict_factory(lazy_collection, creator, getter, setter, parent): if isinstance(parent, MetaAssociationProxy): return MetaAssociationDict( lazy_collection, creator, getter, setter, parent ) else: return _AssociationDict( lazy_collection, creator, getter, setter, parent ) sqlalchemy.ext.associationproxy._AssociationDict = assoc_dict_factory class MetaAssociationDict(_AssociationDict): def _create(self, key, value): parent_obj = self.lazy_collection.ref() class_ = parent_obj.__mapper__.relationships[ self.lazy_collection.target ].mapper.class_ if not parent_obj.category: raise KeyError(key) return class_(attr=parent_obj.category.attributes[key], value=value) def _get(self, object): if object is None: return None return self.getter(object) class MetaAssociationProxy(AssociationProxy): pass class TestProductCatalog(TestCase): def create_models(self): class Category(self.Base): __tablename__ = 'category' id = sa.Column(sa.Integer, primary_key=True) name = sa.Column(sa.Unicode(255)) class Product(self.Base): __tablename__ = 'product' id = sa.Column(sa.Integer, primary_key=True) name = sa.Column(sa.Unicode(255)) category_id = sa.Column( sa.Integer, sa.ForeignKey(Category.id) ) category = sa.orm.relationship(Category) attributes = MetaAssociationProxy( 'attribute_objects', 'value', ) class Attribute(self.Base): __tablename__ = 'attribute' id = sa.Column(sa.Integer, primary_key=True) data_type = sa.Column( MetaType({ 'unicode': sa.UnicodeText, 'int': sa.Integer, 'datetime': sa.DateTime }) ) name = sa.Column(sa.Unicode(255)) category_id = sa.Column( sa.Integer, sa.ForeignKey(Category.id) ) category = sa.orm.relationship( Category, backref=sa.orm.backref( 'attributes', collection_class=attribute_mapped_collection('name') ) ) class AttributeValue(self.Base): __tablename__ = 'attribute_value' id = sa.Column(sa.Integer, primary_key=True) product_id = sa.Column( sa.Integer, sa.ForeignKey(Product.id) ) product = sa.orm.relationship( Product, backref=sa.orm.backref( 'attribute_objects', collection_class=MetaTypedCollection ) ) attr_id = sa.Column( sa.Integer, sa.ForeignKey(Attribute.id) ) attr = sa.orm.relationship(Attribute) value = MetaValue('attr', 'data_type') def repr(self): return self.value self.Product = Product self.Category = Category self.Attribute = Attribute self.AttributeValue = AttributeValue def test_attr_value_setting(self): attr = self.Attribute(data_type=sa.UnicodeText) value = self.AttributeValue(attr=attr) value.value = u'some answer' assert u'some answer' == value.value_unicode def test_unknown_attribute_key(self): product = self.Product() with raises(KeyError): product.attributes[u'color'] = u'red' def test_get_value_returns_none_for_existing_attr(self): category = self.Category(name=u'cars') category.attributes = { u'color': self.Attribute(name=u'color', data_type=sa.UnicodeText), u'maxspeed': self.Attribute(name=u'maxspeed', data_type=sa.Integer) } product = self.Product( name=u'Porsche 911', category=category ) self.session.add(product) self.session.commit() assert product.attributes[u'color'] is None def test_product_attribute_setting(self): category = self.Category(name=u'cars') category.attributes = { u'color': self.Attribute(name=u'color', data_type=sa.UnicodeText), u'maxspeed': self.Attribute(name=u'maxspeed', data_type=sa.Integer) } product = self.Product( name=u'Porsche 911', category=category ) self.session.add(product) self.session.commit() product.attribute_objects[u'color'] = self.AttributeValue( attr=category.attributes['color'], value=u'red' ) product.attribute_objects[u'maxspeed'] = self.AttributeValue( attr=category.attributes['maxspeed'], value=300 ) assert product.attribute_objects[u'color'].value_unicode == u'red' assert product.attribute_objects[u'maxspeed'].value_int == 300 self.session.commit() assert product.attribute_objects[u'color'].value == u'red' assert product.attribute_objects[u'maxspeed'].value == 300 def test_association_proxies(self): category = self.Category(name=u'cars') category.attributes = { u'color': self.Attribute(name=u'color', data_type=sa.UnicodeText), u'maxspeed': self.Attribute(name=u'maxspeed', data_type=sa.Integer) } product = self.Product( name=u'Porsche 911', category=category ) self.session.add(product) self.session.commit() product.attributes[u'color'] = u'red' product.attributes[u'maxspeed'] = 300 assert product.attributes[u'color'] == u'red' assert product.attributes[u'maxspeed'] == 300 self.session.commit() assert product.attributes[u'color'] == u'red' assert product.attributes[u'maxspeed'] == 300 # def test_dynamic_hybrid_properties(self): # category = self.Category(name=u'cars') # category.attributes = { # u'color': self.Attribute(name=u'color', data_type=sa.UnicodeText), # u'maxspeed': self.Attribute(name=u'maxspeed', data_type=sa.Integer) # } # product = self.Product( # name=u'Porsche 911', # category=category # ) # self.session.add(product) # product.attributes[u'color'] = u'red' # product.attributes[u'maxspeed'] = 300 # self.session.commit() # ( # self.session.query(self.Product) # .filter(self.Product.attributes['color'].in_([u'red', u'blue'])) # .order_by(self.Product.attributes['color']) # )