Move property observers out of generates
This commit is contained in:
@@ -54,72 +54,11 @@ class AttributeValueGenerator(object):
|
|||||||
"""
|
"""
|
||||||
Adds generator functions to generator_registry.
|
Adds generator functions to generator_registry.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
for generator in class_.__dict__.values():
|
for generator in class_.__dict__.values():
|
||||||
if hasattr(generator, '__generates__'):
|
if hasattr(generator, '__generates__'):
|
||||||
self.generator_registry[class_].append(generator)
|
self.generator_registry[class_].append(generator)
|
||||||
|
|
||||||
path = generator.__generates__[1]
|
|
||||||
column_key = generator.__generates__[0]
|
|
||||||
if not isinstance(column_key, six.string_types):
|
|
||||||
column_key = column_key.key
|
|
||||||
if path:
|
|
||||||
path = AttrPath(class_, path)
|
|
||||||
attr = getdotattr(class_, str(path))
|
|
||||||
|
|
||||||
if isinstance(attr.property, sa.orm.ColumnProperty):
|
|
||||||
for attr in path:
|
|
||||||
self.generate_property_observer(
|
|
||||||
path,
|
|
||||||
attr,
|
|
||||||
column_key
|
|
||||||
)
|
|
||||||
|
|
||||||
def generate_property_observer(self, path, attr, property_key):
|
|
||||||
"""
|
|
||||||
Generate SQLAlchemy listener that observes given attr within given
|
|
||||||
path space.
|
|
||||||
"""
|
|
||||||
@sa.event.listens_for(attr, 'set')
|
|
||||||
def receive_attr_set(target, value, oldvalue, initiator):
|
|
||||||
index = path.index(attr)
|
|
||||||
if not index:
|
|
||||||
setattr(
|
|
||||||
target,
|
|
||||||
property_key,
|
|
||||||
getdotattr(value, str(path[1:]))
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
inversed_path = ~path[0:-1]
|
|
||||||
if index == len(path) - 1:
|
|
||||||
entities = getdotattr(
|
|
||||||
target,
|
|
||||||
str(inversed_path)
|
|
||||||
)
|
|
||||||
assigned_value = value
|
|
||||||
else:
|
|
||||||
entities = getdotattr(
|
|
||||||
target,
|
|
||||||
str(inversed_path[index:])
|
|
||||||
)
|
|
||||||
assigned_value = getdotattr(value, str(path[(index + 1):]))
|
|
||||||
if entities:
|
|
||||||
if not isinstance(entities, list):
|
|
||||||
entities = [entities]
|
|
||||||
for entity in entities:
|
|
||||||
if isinstance(entity, list):
|
|
||||||
for e in entity:
|
|
||||||
setattr(
|
|
||||||
e,
|
|
||||||
property_key,
|
|
||||||
assigned_value
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
setattr(
|
|
||||||
entity,
|
|
||||||
property_key,
|
|
||||||
assigned_value
|
|
||||||
)
|
|
||||||
|
|
||||||
def update_generated_properties(self, session, ctx, instances):
|
def update_generated_properties(self, session, ctx, instances):
|
||||||
for obj in itertools.chain(session.new, session.dirty):
|
for obj in itertools.chain(session.new, session.dirty):
|
||||||
class_ = obj.__class__
|
class_ = obj.__class__
|
||||||
@@ -135,12 +74,6 @@ class AttributeValueGenerator(object):
|
|||||||
setattr(obj, attr, func(obj, getdotattr(obj, source)))
|
setattr(obj, attr, func(obj, getdotattr(obj, source)))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class PropertyObserver(object):
|
|
||||||
def __init__(property, observed_property_path):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
generator = AttributeValueGenerator()
|
generator = AttributeValueGenerator()
|
||||||
|
|
||||||
|
|
||||||
|
@@ -238,6 +238,8 @@ def getdotattr(obj_or_class, dot_path):
|
|||||||
:param dot_path: Attribute path with dot mark as separator
|
:param dot_path: Attribute path with dot mark as separator
|
||||||
"""
|
"""
|
||||||
last = obj_or_class
|
last = obj_or_class
|
||||||
|
# Coerce object style paths to strings.
|
||||||
|
path = str(dot_path)
|
||||||
|
|
||||||
for path in dot_path.split('.'):
|
for path in dot_path.split('.'):
|
||||||
getter = attrgetter(path)
|
getter = attrgetter(path)
|
||||||
|
@@ -76,7 +76,7 @@ class AttrPath(object):
|
|||||||
def __invert__(self):
|
def __invert__(self):
|
||||||
def get_backref(part):
|
def get_backref(part):
|
||||||
prop = part.property
|
prop = part.property
|
||||||
backref = prop.backref
|
backref = prop.backref or prop.back_populates
|
||||||
if backref is None:
|
if backref is None:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
"Invert failed because property '%s' of class "
|
"Invert failed because property '%s' of class "
|
||||||
|
@@ -123,133 +123,3 @@ class TestGeneratesWithSourcePath(DeepPathGeneratesTestCase):
|
|||||||
self.Document = Document
|
self.Document = Document
|
||||||
self.Section = Section
|
self.Section = Section
|
||||||
self.SubSection = SubSection
|
self.SubSection = SubSection
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TestInstantAttributeValueGeneration(TestCase):
|
|
||||||
def create_models(self):
|
|
||||||
class Document(self.Base):
|
|
||||||
__tablename__ = 'document'
|
|
||||||
id = sa.Column(sa.Integer, primary_key=True)
|
|
||||||
name = sa.Column(sa.Unicode(255))
|
|
||||||
locale = sa.Column(sa.String(10))
|
|
||||||
|
|
||||||
class Section(self.Base):
|
|
||||||
__tablename__ = 'section'
|
|
||||||
id = sa.Column(sa.Integer, primary_key=True)
|
|
||||||
name = sa.Column(sa.Unicode(255))
|
|
||||||
locale = sa.Column(sa.String(10))
|
|
||||||
|
|
||||||
document_id = sa.Column(
|
|
||||||
sa.Integer, sa.ForeignKey(Document.id)
|
|
||||||
)
|
|
||||||
|
|
||||||
document = sa.orm.relationship(Document, backref='sections')
|
|
||||||
|
|
||||||
class SubSection(self.Base):
|
|
||||||
__tablename__ = 'subsection'
|
|
||||||
id = sa.Column(sa.Integer, primary_key=True)
|
|
||||||
name = sa.Column(sa.Unicode(255))
|
|
||||||
locale = sa.Column(sa.String(10))
|
|
||||||
|
|
||||||
section_id = sa.Column(
|
|
||||||
sa.Integer, sa.ForeignKey(Section.id)
|
|
||||||
)
|
|
||||||
|
|
||||||
section = sa.orm.relationship(Section, backref='subsections')
|
|
||||||
|
|
||||||
@generates(locale, source='section.document.locale')
|
|
||||||
def copy_locale(self, value):
|
|
||||||
return value
|
|
||||||
|
|
||||||
self.Document = Document
|
|
||||||
self.Section = Section
|
|
||||||
self.SubSection = SubSection
|
|
||||||
|
|
||||||
def test_change_parent_attribute(self):
|
|
||||||
document = self.Document(name=u'Document 1', locale='en')
|
|
||||||
section = self.Section(name=u'Section 1', document=document)
|
|
||||||
subsection = self.SubSection(name=u'Section 1', section=section)
|
|
||||||
document.locale = 'fi'
|
|
||||||
assert subsection.locale == 'fi'
|
|
||||||
|
|
||||||
def test_simple_assignment(self):
|
|
||||||
document = self.Document(name=u'Document 1', locale='en')
|
|
||||||
section = self.Section(name=u'Section 1', document=document)
|
|
||||||
subsection = self.SubSection(name=u'Section 1', section=section)
|
|
||||||
assert subsection.locale == 'en'
|
|
||||||
|
|
||||||
def test_intermediate_object_reference(self):
|
|
||||||
document = self.Document(name=u'Document 1', locale='en')
|
|
||||||
section = self.Section(name=u'Section 1', document=document)
|
|
||||||
subsection = self.SubSection(name=u'Section 1', section=section)
|
|
||||||
section.document = self.Document(name=u'Document 2', locale='sv')
|
|
||||||
assert subsection.locale == 'sv'
|
|
||||||
|
|
||||||
|
|
||||||
class TestInstantAttrGenerationWithScalars(TestCase):
|
|
||||||
def create_models(self):
|
|
||||||
class Document(self.Base):
|
|
||||||
__tablename__ = 'document'
|
|
||||||
id = sa.Column(sa.Integer, primary_key=True)
|
|
||||||
name = sa.Column(sa.Unicode(255))
|
|
||||||
locale = sa.Column(sa.String(10))
|
|
||||||
|
|
||||||
class Section(self.Base):
|
|
||||||
__tablename__ = 'section'
|
|
||||||
id = sa.Column(sa.Integer, primary_key=True)
|
|
||||||
name = sa.Column(sa.Unicode(255))
|
|
||||||
locale = sa.Column(sa.String(10))
|
|
||||||
|
|
||||||
document_id = sa.Column(
|
|
||||||
sa.Integer, sa.ForeignKey(Document.id)
|
|
||||||
)
|
|
||||||
|
|
||||||
document = sa.orm.relationship(
|
|
||||||
Document,
|
|
||||||
backref=sa.orm.backref('section', uselist=False)
|
|
||||||
)
|
|
||||||
|
|
||||||
class SubSection(self.Base):
|
|
||||||
__tablename__ = 'subsection'
|
|
||||||
id = sa.Column(sa.Integer, primary_key=True)
|
|
||||||
name = sa.Column(sa.Unicode(255))
|
|
||||||
locale = sa.Column(sa.String(10))
|
|
||||||
|
|
||||||
section_id = sa.Column(
|
|
||||||
sa.Integer, sa.ForeignKey(Section.id)
|
|
||||||
)
|
|
||||||
|
|
||||||
section = sa.orm.relationship(
|
|
||||||
Section,
|
|
||||||
backref=sa.orm.backref('subsection', uselist=False)
|
|
||||||
)
|
|
||||||
|
|
||||||
@generates(locale, source='section.document.locale')
|
|
||||||
def copy_locale(self, value):
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
self.Document = Document
|
|
||||||
self.Section = Section
|
|
||||||
self.SubSection = SubSection
|
|
||||||
|
|
||||||
def test_change_parent_attribute(self):
|
|
||||||
document = self.Document(name=u'Document 1', locale='en')
|
|
||||||
section = self.Section(name=u'Section 1', document=document)
|
|
||||||
subsection = self.SubSection(name=u'Section 1', section=section)
|
|
||||||
document.locale = 'fi'
|
|
||||||
assert subsection.locale == 'fi'
|
|
||||||
|
|
||||||
def test_simple_assignment(self):
|
|
||||||
document = self.Document(name=u'Document 1', locale='en')
|
|
||||||
section = self.Section(name=u'Section 1', document=document)
|
|
||||||
subsection = self.SubSection(name=u'Section 1', section=section)
|
|
||||||
assert subsection.locale == 'en'
|
|
||||||
|
|
||||||
def test_intermediate_object_reference(self):
|
|
||||||
document = self.Document(name=u'Document 1', locale='en')
|
|
||||||
section = self.Section(name=u'Section 1', document=document)
|
|
||||||
subsection = self.SubSection(name=u'Section 1', section=section)
|
|
||||||
section.document = self.Document(name=u'Document 2', locale='sv')
|
|
||||||
assert subsection.locale == 'sv'
|
|
||||||
|
Reference in New Issue
Block a user