Files
deb-python-sqlalchemy-utils/sqlalchemy_utils/functions/database.py
2014-02-22 10:34:59 +02:00

237 lines
6.7 KiB
Python

from collections import defaultdict
from sqlalchemy.engine.url import make_url
import sqlalchemy as sa
from sqlalchemy.schema import MetaData, Table, ForeignKeyConstraint
from sqlalchemy.exc import ProgrammingError, OperationalError
import os
from copy import copy
def escape_like(string, escape_char='*'):
"""
Escapes the string paremeter used in SQL LIKE expressions
>>> from sqlalchemy_utils import escape_like
>>> query = session.query(User).filter(
... User.name.ilike(escape_like('John'))
... )
:param string: a string to escape
:param escape_char: escape character
"""
return (
string
.replace(escape_char, escape_char * 2)
.replace('%', escape_char + '%')
.replace('_', escape_char + '_')
)
def is_auto_assigned_date_column(column):
"""
Returns whether or not given SQLAlchemy Column object's is auto assigned
DateTime or Date.
:param column: SQLAlchemy Column object
"""
return (
(
isinstance(column.type, sa.DateTime) or
isinstance(column.type, sa.Date)
)
and
(
column.default or
column.server_default or
column.onupdate or
column.server_onupdate
)
)
def database_exists(url):
"""Check if a database exists.
:param url: A SQLAlchemy engine URL.
Performs backend-specific testing to quickly determine if a database
exists on the server. ::
database_exists('postgres://postgres@localhost/name') #=> False
create_database('postgres://postgres@localhost/name')
database_exists('postgres://postgres@localhost/name') #=> True
Supports checking against a constructed URL as well. ::
engine = create_engine('postgres://postgres@localhost/name')
database_exists(engine.url) #=> False
create_database(engine.url)
database_exists(engine.url) #=> True
"""
url = copy(make_url(url))
database = url.database
url.database = None
engine = sa.create_engine(url)
if engine.dialect.name == 'postgresql':
text = "SELECT 1 FROM pg_database WHERE datname='%s'" % database
return bool(engine.execute(text).scalar())
elif engine.dialect.name == 'mysql':
text = ("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA "
"WHERE SCHEMA_NAME = '%s'" % database)
return bool(engine.execute(text).scalar())
elif engine.dialect.name == 'sqlite':
return database == ':memory:' or os.path.exists(database)
else:
text = 'SELECT 1'
try:
url.database = database
engine = sa.create_engine(url)
engine.execute(text)
return True
except (ProgrammingError, OperationalError):
return False
def create_database(url, encoding='utf8'):
"""Issue the appropriate CREATE DATABASE statement.
:param url: A SQLAlchemy engine URL.
:param encoding: The encoding to create the database as.
To create a database, you can pass a simple URL that would have
been passed to ``create_engine``. ::
create_database('postgres://postgres@localhost/name')
You may also pass the url from an existing engine. ::
create_database(engine.url)
Has full support for mysql, postgres, and sqlite. In theory,
other database engines should be supported.
"""
url = copy(make_url(url))
database = url.database
if not url.drivername.startswith('sqlite'):
url.database = None
engine = sa.create_engine(url)
if engine.dialect.name == 'postgresql':
if engine.driver == 'psycopg2':
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
engine.raw_connection().set_isolation_level(
ISOLATION_LEVEL_AUTOCOMMIT)
text = "CREATE DATABASE %s ENCODING = '%s'" % (database, encoding)
engine.execute(text)
elif engine.dialect.name == 'mysql':
text = "CREATE DATABASE %s CHARACTER SET = '%s'" % (database, encoding)
engine.execute(text)
elif engine.dialect.name == 'sqlite' and database != ':memory:':
open(database, 'w').close()
else:
text = "CREATE DATABASE %s" % database
engine.execute(text)
def drop_database(url):
"""Issue the appropriate DROP DATABASE statement.
:param url: A SQLAlchemy engine URL.
Works similar to the :ref:`create_database` method in that both url text
and a constructed url are accepted. ::
drop_database('postgres://postgres@localhost/name')
drop_database(engine.url)
"""
url = copy(make_url(url))
database = url.database
if not url.drivername.startswith('sqlite'):
url.database = None
engine = sa.create_engine(url)
if engine.dialect.name == 'sqlite' and url.database != ':memory:':
os.remove(url.database)
elif engine.dialect.name == 'postgresql' and engine.driver == 'psycopg2':
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
engine.raw_connection().set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
text = "DROP DATABASE %s" % database
engine.execute(text)
else:
text = "DROP DATABASE %s" % database
engine.execute(text)
def non_indexed_foreign_keys(metadata, engine=None):
"""
Finds all non indexed foreign keys from all tables of given MetaData.
Very useful for optimizing postgresql database and finding out which
foreign keys need indexes.
:param metadata: MetaData object to inspect tables from
"""
reflected_metadata = MetaData()
if metadata.bind is None and engine is None:
raise Exception(
'Either pass a metadata object with bind or '
'pass engine as a second parameter'
)
constraints = defaultdict(list)
for table_name in metadata.tables.keys():
table = Table(
table_name,
reflected_metadata,
autoload=True,
autoload_with=metadata.bind or engine
)
for constraint in table.constraints:
if not isinstance(constraint, ForeignKeyConstraint):
continue
if not is_indexed_foreign_key(constraint):
constraints[table.name].append(constraint)
return dict(constraints)
def is_indexed_foreign_key(constraint):
"""
Whether or not given foreign key constraint's columns have been indexed.
:param constraint: ForeignKeyConstraint object to check the indexes
"""
for index in constraint.table.indexes:
index_column_names = set(
column.name for column in index.columns
)
if index_column_names == set(constraint.columns):
return True
return False