Add auto_delete_orphans

This commit is contained in:
Konsta Vesterinen
2014-06-25 21:58:35 +03:00
parent 8d9cb7c25f
commit 737dd34934
5 changed files with 272 additions and 0 deletions

View File

@@ -14,3 +14,9 @@ Instant defaults
----------------
.. autofunction:: force_instant_defaults
Many-to-many orphan deletion
----------------------------
.. autofunction:: auto_delete_orphans

View File

@@ -32,6 +32,7 @@ from .functions import (
table_name,
)
from .listeners import (
auto_delete_orphans,
coercion_listener,
force_auto_coercion,
force_instant_defaults
@@ -75,6 +76,7 @@ __version__ = '0.26.3'
__all__ = (
aggregated,
auto_delete_orphans,
batch_fetch,
coercion_listener,
create_database,

View File

@@ -507,6 +507,8 @@ def has_any_changes(model, columns):
has_any_changes(user, ('name', 'age')) # True
.. versionadded: 0.26.3
:param obj: SQLAlchemy declarative model object
:param attrs: Names of the attributes
"""

View File

@@ -1,4 +1,5 @@
import sqlalchemy as sa
from .exceptions import ImproperlyConfigured
def coercion_listener(mapper, class_):
@@ -106,3 +107,147 @@ def force_instant_defaults(mapper=None):
if mapper is None:
mapper = sa.orm.mapper
sa.event.listen(mapper, 'init', instant_defaults_listener)
def auto_delete_orphans(attr):
"""
Delete orphans for given SQLAlchemy model attribute. This function can be
used for deleting many-to-many associated orphans easily. For more
information see https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes/ManyToManyOrphan.
Consider the following model definition:
::
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import event
Base = declarative_base()
tagging = Table(
'tagging',
Base.metadata,
Column(
'tag_id',
Integer,
ForeignKey('tag.id', ondelete='CASCADE'),
primary_key=True
),
Column(
'entry_id',
Integer,
ForeignKey('entry.id', ondelete='CASCADE'),
primary_key=True
)
)
class Tag(Base):
__tablename__ = 'tag'
id = Column(Integer, primary_key=True)
name = Column(String(100), unique=True, nullable=False)
def __init__(self, name=None):
self.name = name
class Entry(Base):
__tablename__ = 'entry'
id = Column(Integer, primary_key=True)
tags = relationship(
'Tag',
secondary=tagging,
backref='entries'
)
Now lets say we want to delete the tags if all their parents get deleted (
all Entry objects get deleted). This can be achieved as follows:
::
from sqlalchemy_utils import auto_delete_orphans
auto_delete_orphans(Entry.tags)
After we've set up this listener we can see it in action.
::
e = create_engine('sqlite://')
Base.metadata.create_all(e)
s = Session(e)
r1 = Entry()
r2 = Entry()
r3 = Entry()
t1, t2, t3, t4 = Tag('t1'), Tag('t2'), Tag('t3'), Tag('t4')
r1.tags.extend([t1, t2])
r2.tags.extend([t2, t3])
r3.tags.extend([t4])
s.add_all([r1, r2, r3])
assert s.query(Tag).count() == 4
r2.tags.remove(t2)
assert s.query(Tag).count() == 4
r1.tags.remove(t2)
assert s.query(Tag).count() == 3
r1.tags.remove(t1)
assert s.query(Tag).count() == 2
.. versionadded: 0.26.4
:param attr: Association relationship attribute to auto delete orphans from
"""
parent_class = attr.parent.class_
target_class = attr.property.mapper.class_
backref = attr.property.backref
if not backref:
raise ImproperlyConfigured(
'The relationship argument given for auto_delete_orphans needs to '
'have a backref relationship set.'
)
@sa.event.listens_for(sa.orm.Session, 'after_flush')
def delete_orphan_listener(session, ctx):
# Look through Session state to see if we want to emit a DELETE for
# orphans
orphans_found = (
any(
isinstance(obj, parent_class) and
sa.orm.attributes.get_history(obj, attr.key).deleted
for obj in session.dirty
) or
any(
isinstance(obj, parent_class)
for obj in session.deleted
)
)
if orphans_found:
# Emit a DELETE for all orphans
(
session.query(target_class)
.filter(
~getattr(target_class, attr.property.backref).any()
)
.delete(synchronize_session=False)
)

View File

@@ -0,0 +1,117 @@
from pytest import raises
import sqlalchemy as sa
from sqlalchemy_utils import auto_delete_orphans, ImproperlyConfigured
from tests import TestCase
class TestAutoDeleteOrphans(TestCase):
def create_models(self):
tagging = sa.Table(
'tagging',
self.Base.metadata,
sa.Column(
'tag_id',
sa.Integer,
sa.ForeignKey('tag.id', ondelete='cascade'),
primary_key=True
),
sa.Column(
'entry_id',
sa.Integer,
sa.ForeignKey('entry.id', ondelete='cascade'),
primary_key=True
)
)
class Tag(self.Base):
__tablename__ = 'tag'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String(100), unique=True, nullable=False)
def __init__(self, name=None):
self.name = name
class Entry(self.Base):
__tablename__ = 'entry'
id = sa.Column(sa.Integer, primary_key=True)
tags = sa.orm.relationship(
'Tag',
secondary=tagging,
backref='entries'
)
auto_delete_orphans(Entry.tags)
self.Tag = Tag
self.Entry = Entry
def test_orphan_deletion(self):
r1 = self.Entry()
r2 = self.Entry()
r3 = self.Entry()
t1, t2, t3, t4 = (
self.Tag('t1'),
self.Tag('t2'),
self.Tag('t3'),
self.Tag('t4')
)
r1.tags.extend([t1, t2])
r2.tags.extend([t2, t3])
r3.tags.extend([t4])
self.session.add_all([r1, r2, r3])
assert self.session.query(self.Tag).count() == 4
r2.tags.remove(t2)
assert self.session.query(self.Tag).count() == 4
r1.tags.remove(t2)
assert self.session.query(self.Tag).count() == 3
r1.tags.remove(t1)
assert self.session.query(self.Tag).count() == 2
class TestAutoDeleteOrphansWithoutBackref(TestCase):
def create_models(self):
tagging = sa.Table(
'tagging',
self.Base.metadata,
sa.Column(
'tag_id',
sa.Integer,
sa.ForeignKey('tag.id', ondelete='cascade'),
primary_key=True
),
sa.Column(
'entry_id',
sa.Integer,
sa.ForeignKey('entry.id', ondelete='cascade'),
primary_key=True
)
)
class Tag(self.Base):
__tablename__ = 'tag'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String(100), unique=True, nullable=False)
def __init__(self, name=None):
self.name = name
class Entry(self.Base):
__tablename__ = 'entry'
id = sa.Column(sa.Integer, primary_key=True)
tags = sa.orm.relationship(
'Tag',
secondary=tagging
)
self.Entry = Entry
def test_orphan_deletion(self):
with raises(ImproperlyConfigured):
auto_delete_orphans(self.Entry.tags)