from collections import defaultdict from sqlalchemy.engine.url import make_url import sqlalchemy as sa from sqlalchemy.schema import MetaData, Table, ForeignKeyConstraint from sqlalchemy.exc import ProgrammingError, OperationalError import os from copy import copy def escape_like(string, escape_char='*'): """ Escapes the string paremeter used in SQL LIKE expressions >>> from sqlalchemy_utils import escape_like >>> query = session.query(User).filter( ... User.name.ilike(escape_like('John')) ... ) :param string: a string to escape :param escape_char: escape character """ return ( string .replace(escape_char, escape_char * 2) .replace('%', escape_char + '%') .replace('_', escape_char + '_') ) def is_auto_assigned_date_column(column): """ Returns whether or not given SQLAlchemy Column object's is auto assigned DateTime or Date. :param column: SQLAlchemy Column object """ return ( ( isinstance(column.type, sa.DateTime) or isinstance(column.type, sa.Date) ) and ( column.default or column.server_default or column.onupdate or column.server_onupdate ) ) def database_exists(url): """Check if a database exists. :param url: A SQLAlchemy engine URL. Performs backend-specific testing to quickly determine if a database exists on the server. :: database_exists('postgres://postgres@localhost/name') #=> False create_database('postgres://postgres@localhost/name') database_exists('postgres://postgres@localhost/name') #=> True Supports checking against a constructed URL as well. :: engine = create_engine('postgres://postgres@localhost/name') database_exists(engine.url) #=> False create_database(engine.url) database_exists(engine.url) #=> True """ url = copy(make_url(url)) database = url.database url.database = None engine = sa.create_engine(url) if engine.dialect.name == 'postgresql': text = "SELECT 1 FROM pg_database WHERE datname='%s'" % database return bool(engine.execute(text).scalar()) elif engine.dialect.name == 'mysql': text = ("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA " "WHERE SCHEMA_NAME = '%s'" % database) return bool(engine.execute(text).scalar()) elif engine.dialect.name == 'sqlite': return database == ':memory:' or os.path.exists(database) else: text = 'SELECT 1' try: url.database = database engine = sa.create_engine(url) engine.execute(text) return True except (ProgrammingError, OperationalError): return False def create_database(url, encoding='utf8'): """Issue the appropriate CREATE DATABASE statement. :param url: A SQLAlchemy engine URL. :param encoding: The encoding to create the database as. To create a database, you can pass a simple URL that would have been passed to ``create_engine``. :: create_database('postgres://postgres@localhost/name') You may also pass the url from an existing engine. :: create_database(engine.url) Has full support for mysql, postgres, and sqlite. In theory, other database engines should be supported. """ url = copy(make_url(url)) database = url.database if not url.drivername.startswith('sqlite'): url.database = None engine = sa.create_engine(url) if engine.dialect.name == 'postgresql': if engine.driver == 'psycopg2': from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT engine.raw_connection().set_isolation_level( ISOLATION_LEVEL_AUTOCOMMIT) text = "CREATE DATABASE %s ENCODING = '%s'" % (database, encoding) engine.execute(text) elif engine.dialect.name == 'mysql': text = "CREATE DATABASE %s CHARACTER SET = '%s'" % (database, encoding) engine.execute(text) elif engine.dialect.name == 'sqlite' and database != ':memory:': open(database, 'w').close() else: text = "CREATE DATABASE %s" % database engine.execute(text) def drop_database(url): """Issue the appropriate DROP DATABASE statement. :param url: A SQLAlchemy engine URL. Works similar to the :ref:`create_database` method in that both url text and a constructed url are accepted. :: drop_database('postgres://postgres@localhost/name') drop_database(engine.url) """ url = copy(make_url(url)) database = url.database if not url.drivername.startswith('sqlite'): url.database = None engine = sa.create_engine(url) if engine.dialect.name == 'sqlite' and url.database != ':memory:': os.remove(url.database) elif engine.dialect.name == 'postgresql' and engine.driver == 'psycopg2': from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT engine.raw_connection().set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) text = "DROP DATABASE %s" % database engine.execute(text) else: text = "DROP DATABASE %s" % database engine.execute(text) def non_indexed_foreign_keys(metadata, engine=None): """ Finds all non indexed foreign keys from all tables of given MetaData. Very useful for optimizing postgresql database and finding out which foreign keys need indexes. :param metadata: MetaData object to inspect tables from """ reflected_metadata = MetaData() if metadata.bind is None and engine is None: raise Exception( 'Either pass a metadata object with bind or ' 'pass engine as a second parameter' ) constraints = defaultdict(list) for table_name in metadata.tables.keys(): table = Table( table_name, reflected_metadata, autoload=True, autoload_with=metadata.bind or engine ) for constraint in table.constraints: if not isinstance(constraint, ForeignKeyConstraint): continue if not is_indexed_foreign_key(constraint): constraints[table.name].append(constraint) return dict(constraints) def is_indexed_foreign_key(constraint): """ Whether or not given foreign key constraint's columns have been indexed. :param constraint: ForeignKeyConstraint object to check the indexes """ for index in constraint.table.indexes: index_column_names = set( column.name for column in index.columns ) if index_column_names == set(constraint.columns): return True return False