sqlalchemy-migrate/migrate/tests/changeset/test_constraint.py

299 lines
11 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from sqlalchemy import *
from sqlalchemy.util import *
from sqlalchemy.exc import *
from migrate.exceptions import *
from migrate.changeset import *
from migrate.tests import fixture
class CommonTestConstraint(fixture.DB):
"""helper functions to test constraints.
we just create a fresh new table and make sure everything is
as required.
"""
def _setup(self, url):
super(CommonTestConstraint, self)._setup(url)
self._create_table()
def _teardown(self):
if hasattr(self, 'table') and self.engine.has_table(self.table.name):
self.table.drop()
super(CommonTestConstraint, self)._teardown()
def _create_table(self):
self._connect(self.url)
self.meta = MetaData(self.engine)
self.tablename = 'mytable'
self.table = Table(self.tablename, self.meta,
Column(u'id', Integer, nullable=False),
Column(u'fkey', Integer, nullable=False),
mysql_engine='InnoDB')
if self.engine.has_table(self.table.name):
self.table.drop()
self.table.create()
# make sure we start at zero
self.assertEqual(len(self.table.primary_key), 0)
self.assert_(isinstance(self.table.primary_key,
schema.PrimaryKeyConstraint), self.table.primary_key.__class__)
class TestConstraint(CommonTestConstraint):
level = fixture.DB.CONNECT
def _define_pk(self, *cols):
# Add a pk by creating a PK constraint
if (self.engine.name in ('oracle', 'firebird')):
# Can't drop Oracle PKs without an explicit name
pk = PrimaryKeyConstraint(table=self.table, name='temp_pk_key', *cols)
else:
pk = PrimaryKeyConstraint(table=self.table, *cols)
self.compare_columns_equal(pk.columns, cols)
pk.create()
self.refresh_table()
if not self.url.startswith('sqlite'):
self.compare_columns_equal(self.table.primary_key, cols, ['type', 'autoincrement'])
# Drop the PK constraint
#if (self.engine.name in ('oracle', 'firebird')):
# # Apparently Oracle PK names aren't introspected
# pk.name = self.table.primary_key.name
pk.drop()
self.refresh_table()
self.assertEqual(len(self.table.primary_key), 0)
self.assert_(isinstance(self.table.primary_key, schema.PrimaryKeyConstraint))
return pk
@fixture.usedb(not_supported='sqlite')
def test_define_fk(self):
"""FK constraints can be defined, created, and dropped"""
# FK target must be unique
pk = PrimaryKeyConstraint(self.table.c.id, table=self.table, name="pkid")
pk.create()
# Add a FK by creating a FK constraint
if SQLA_07:
self.assertEqual(list(self.table.c.fkey.foreign_keys), [])
else:
self.assertEqual(self.table.c.fkey.foreign_keys._list, [])
fk = ForeignKeyConstraint([self.table.c.fkey],
[self.table.c.id],
name="fk_id_fkey",
ondelete="CASCADE")
if SQLA_07:
self.assert_(list(self.table.c.fkey.foreign_keys) is not [])
else:
self.assert_(self.table.c.fkey.foreign_keys._list is not [])
for key in fk.columns:
self.assertEqual(key, self.table.c.fkey.name)
self.assertEqual([e.column for e in fk.elements], [self.table.c.id])
self.assertEqual(list(fk.referenced), [self.table.c.id])
if self.url.startswith('mysql'):
# MySQL FKs need an index
index = Index('index_name', self.table.c.fkey)
index.create()
fk.create()
# test for ondelete/onupdate
if SQLA_07:
fkey = list(self.table.c.fkey.foreign_keys)[0]
else:
fkey = self.table.c.fkey.foreign_keys._list[0]
self.assertEqual(fkey.ondelete, "CASCADE")
# TODO: test on real db if it was set
self.refresh_table()
if SQLA_07:
self.assert_(list(self.table.c.fkey.foreign_keys) is not [])
else:
self.assert_(self.table.c.fkey.foreign_keys._list is not [])
fk.drop()
self.refresh_table()
if SQLA_07:
self.assertEqual(list(self.table.c.fkey.foreign_keys), [])
else:
self.assertEqual(self.table.c.fkey.foreign_keys._list, [])
@fixture.usedb()
def test_define_pk(self):
"""PK constraints can be defined, created, and dropped"""
self._define_pk(self.table.c.fkey)
@fixture.usedb()
def test_define_pk_multi(self):
"""Multicolumn PK constraints can be defined, created, and dropped"""
self._define_pk(self.table.c.id, self.table.c.fkey)
@fixture.usedb(not_supported=['firebird'])
def test_drop_cascade(self):
"""Drop constraint cascaded"""
pk = PrimaryKeyConstraint('fkey', table=self.table, name="id_pkey")
pk.create()
self.refresh_table()
# Drop the PK constraint forcing cascade
pk.drop(cascade=True)
# TODO: add real assertion if it was added
@fixture.usedb(supported=['mysql'])
def test_fail_mysql_check_constraints(self):
"""Check constraints raise NotSupported for mysql on drop"""
cons = CheckConstraint('id > 3', name="id_check", table=self.table)
cons.create()
self.refresh_table()
try:
cons.drop()
except NotSupportedError:
pass
else:
self.fail()
@fixture.usedb(not_supported=['sqlite', 'mysql'])
def test_named_check_constraints(self):
"""Check constraints can be defined, created, and dropped"""
self.assertRaises(InvalidConstraintError, CheckConstraint, 'id > 3')
cons = CheckConstraint('id > 3', name="id_check", table=self.table)
cons.create()
self.refresh_table()
self.table.insert(values={'id': 4, 'fkey': 1}).execute()
try:
self.table.insert(values={'id': 1, 'fkey': 1}).execute()
except (IntegrityError, ProgrammingError):
pass
else:
self.fail()
# Remove the name, drop the constraint; it should succeed
cons.drop()
self.refresh_table()
self.table.insert(values={'id': 2, 'fkey': 2}).execute()
self.table.insert(values={'id': 1, 'fkey': 2}).execute()
class TestAutoname(CommonTestConstraint):
"""Every method tests for a type of constraint wether it can autoname
itself and if you can pass object instance and names to classes.
"""
level = fixture.DB.CONNECT
@fixture.usedb(not_supported=['oracle', 'firebird'])
def test_autoname_pk(self):
"""PrimaryKeyConstraints can guess their name if None is given"""
# Don't supply a name; it should create one
cons = PrimaryKeyConstraint(self.table.c.id)
cons.create()
self.refresh_table()
if not self.url.startswith('sqlite'):
# TODO: test for index for sqlite
self.compare_columns_equal(cons.columns, self.table.primary_key, ['autoincrement', 'type'])
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
self.refresh_table()
self.assertEqual(list(), list(self.table.primary_key))
# test string names
cons = PrimaryKeyConstraint('id', table=self.table)
cons.create()
self.refresh_table()
if not self.url.startswith('sqlite'):
# TODO: test for index for sqlite
self.compare_columns_equal(cons.columns, self.table.primary_key)
cons.name = None
cons.drop()
@fixture.usedb(not_supported=['oracle', 'sqlite', 'firebird'])
def test_autoname_fk(self):
"""ForeignKeyConstraints can guess their name if None is given"""
cons = PrimaryKeyConstraint(self.table.c.id)
cons.create()
cons = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id])
cons.create()
self.refresh_table()
if SQLA_07:
list(self.table.c.fkey.foreign_keys)[0].column is self.table.c.id
else:
self.table.c.fkey.foreign_keys[0].column is self.table.c.id
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
self.refresh_table()
if SQLA_07:
self.assertEqual(list(self.table.c.fkey.foreign_keys), list())
else:
self.assertEqual(self.table.c.fkey.foreign_keys._list, list())
# test string names
cons = ForeignKeyConstraint(['fkey'], ['%s.id' % self.tablename], table=self.table)
cons.create()
self.refresh_table()
if SQLA_07:
list(self.table.c.fkey.foreign_keys)[0].column is self.table.c.id
else:
self.table.c.fkey.foreign_keys[0].column is self.table.c.id
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
@fixture.usedb(not_supported=['oracle', 'sqlite', 'mysql'])
def test_autoname_check(self):
"""CheckConstraints can guess their name if None is given"""
cons = CheckConstraint('id > 3', columns=[self.table.c.id])
cons.create()
self.refresh_table()
if not self.engine.name == 'mysql':
self.table.insert(values={'id': 4, 'fkey': 1}).execute()
try:
self.table.insert(values={'id': 1, 'fkey': 2}).execute()
except (IntegrityError, ProgrammingError):
pass
else:
self.fail()
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
self.refresh_table()
self.table.insert(values={'id': 2, 'fkey': 2}).execute()
self.table.insert(values={'id': 1, 'fkey': 3}).execute()
@fixture.usedb(not_supported=['oracle', 'sqlite'])
def test_autoname_unique(self):
"""UniqueConstraints can guess their name if None is given"""
cons = UniqueConstraint(self.table.c.fkey)
cons.create()
self.refresh_table()
self.table.insert(values={'fkey': 4, 'id': 1}).execute()
try:
self.table.insert(values={'fkey': 4, 'id': 2}).execute()
except (sqlalchemy.exc.IntegrityError,
sqlalchemy.exc.ProgrammingError):
pass
else:
self.fail()
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
self.refresh_table()
self.table.insert(values={'fkey': 4, 'id': 2}).execute()
self.table.insert(values={'fkey': 4, 'id': 1}).execute()