Refactored functions module

This commit is contained in:
Konsta Vesterinen
2013-10-30 14:22:38 +02:00
parent 5da77d3d3a
commit c1a725eccb
3 changed files with 184 additions and 179 deletions

View File

@@ -1,22 +1,21 @@
from collections import defaultdict from collections import defaultdict
import six
import re
import datetime
import contextlib
import inspect
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy.orm.query import Query
from sqlalchemy.schema import MetaData, Table, ForeignKeyConstraint from sqlalchemy.schema import MetaData, Table, ForeignKeyConstraint
from six.moves import cStringIO
from .batch_fetch import batch_fetch, with_backrefs, CompositePath from .batch_fetch import batch_fetch, with_backrefs, CompositePath
from .defer_except import defer_except from .defer_except import defer_except
from .mock import create_mock_engine, mock_engine
from .render import render_expression, render_statement
from .sort_query import sort_query, QuerySorterException from .sort_query import sort_query, QuerySorterException
__all__ = ( __all__ = (
batch_fetch, batch_fetch,
create_mock_engine,
defer_except, defer_except,
mock_engine,
sort_query, sort_query,
render_expression,
render_statement,
with_backrefs, with_backrefs,
CompositePath, CompositePath,
QuerySorterException QuerySorterException
@@ -232,175 +231,3 @@ def naturally_equivalent(obj, obj2):
if not (getattr(obj, prop.key) == getattr(obj2, prop.key)): if not (getattr(obj, prop.key) == getattr(obj2, prop.key)):
return False return False
return True return True
def create_mock_engine(bind, stream=None):
"""Create a mock SQLAlchemy engine from the passed engine or bind URL.
:param bind: A SQLAlchemy engine or bind URL to mock.
:param stream: Render all DDL operations to the stream.
"""
if not isinstance(bind, six.string_types):
bind_url = str(bind.url)
else:
bind_url = bind
if stream is not None:
def dump(sql, *args, **kwargs):
class Compiler(type(sql._compiler(engine.dialect))):
def visit_bindparam(self, bindparam, *args, **kwargs):
return self.render_literal_value(
bindparam.value, bindparam.type)
def render_literal_value(self, value, type_):
if isinstance(value, six.integer_types):
return str(value)
elif isinstance(value, (datetime.date, datetime.datetime)):
return "'%s'" % value
return super(Compiler, self).render_literal_value(
value, type_)
text = str(Compiler(engine.dialect, sql).process(sql))
text = re.sub(r'\n+', '\n', text)
text = text.strip('\n').strip()
stream.write('\n%s;' % text)
else:
dump = lambda *a, **kw: None
engine = sa.create_engine(bind_url, strategy='mock', executor=dump)
return engine
@contextlib.contextmanager
def mock_engine(engine, stream=None):
"""Mocks out the engine specified in the passed bind expression.
Note this function is meant for convenience and protected usage. Do NOT
blindly pass user input to this function as it uses exec.
:param engine: A python expression that represents the engine to mock.
:param stream: Render all DDL operations to the stream.
"""
# Create a stream if not present.
if stream is None:
stream = cStringIO()
# Navigate the stack and find the calling frame that allows the
# expression to execuate.
for frame in inspect.stack()[1:]:
try:
frame = frame[0]
expression = '__target = %s' % engine
six.exec_(expression, frame.f_globals, frame.f_locals)
target = frame.f_locals['__target']
break
except:
pass
else:
raise ValueError('Not a valid python expression', engine)
# Evaluate the expression and get the target engine.
frame.f_locals['__mock'] = create_mock_engine(target, stream)
# Replace the target with our mock.
six.exec_('%s = __mock' % engine, frame.f_globals, frame.f_locals)
# Give control back.
yield stream
# Put the target engine back.
frame.f_locals['__target'] = target
six.exec_('%s = __target' % engine, frame.f_globals, frame.f_locals)
six.exec_('del __target', frame.f_globals, frame.f_locals)
six.exec_('del __mock', frame.f_globals, frame.f_locals)
def render_expression(expression, bind, stream=None):
"""Generate a SQL expression from the passed python expression.
Only the global variable, `engine`, is available for use in the
expression. Additional local variables may be passed in the context
parameter.
Note this function is meant for convenience and protected usage. Do NOT
blindly pass user input to this function as it uses exec.
:param bind: A SQLAlchemy engine or bind URL.
:param stream: Render all DDL operations to the stream.
"""
# Create a stream if not present.
if stream is None:
stream = cStringIO()
engine = create_mock_engine(bind, stream)
# Navigate the stack and find the calling frame that allows the
# expression to execuate.
for frame in inspect.stack()[1:]:
try:
frame = frame[0]
local = dict(frame.f_locals)
local['engine'] = engine
six.exec_(expression, frame.f_globals, local)
break
except:
pass
else:
raise ValueError('Not a valid python expression', engine)
return stream
def render_statement(statement, bind=None):
"""
Generate an SQL expression string with bound parameters rendered inline
for the given SQLAlchemy statement.
:param statement: SQLAlchemy Query object.
:param bind:
Optional SQLAlchemy bind, if None uses the bind of the given query
object.
"""
if isinstance(statement, Query):
if bind is None:
bind = statement.session.get_bind(statement._mapper_zero_or_none())
statement = statement.statement
elif bind is None:
bind = statement.bind
stream = cStringIO()
engine = create_mock_engine(bind.engine, stream=stream)
engine.execute(statement)
return stream.getvalue()

View File

@@ -0,0 +1,108 @@
import contextlib
import datetime
import inspect
import re
import six
import sqlalchemy as sa
def create_mock_engine(bind, stream=None):
"""Create a mock SQLAlchemy engine from the passed engine or bind URL.
:param bind: A SQLAlchemy engine or bind URL to mock.
:param stream: Render all DDL operations to the stream.
"""
if not isinstance(bind, six.string_types):
bind_url = str(bind.url)
else:
bind_url = bind
if stream is not None:
def dump(sql, *args, **kwargs):
class Compiler(type(sql._compiler(engine.dialect))):
def visit_bindparam(self, bindparam, *args, **kwargs):
return self.render_literal_value(
bindparam.value, bindparam.type)
def render_literal_value(self, value, type_):
if isinstance(value, six.integer_types):
return str(value)
elif isinstance(value, (datetime.date, datetime.datetime)):
return "'%s'" % value
return super(Compiler, self).render_literal_value(
value, type_)
text = str(Compiler(engine.dialect, sql).process(sql))
text = re.sub(r'\n+', '\n', text)
text = text.strip('\n').strip()
stream.write('\n%s;' % text)
else:
dump = lambda *a, **kw: None
engine = sa.create_engine(bind_url, strategy='mock', executor=dump)
return engine
@contextlib.contextmanager
def mock_engine(engine, stream=None):
"""Mocks out the engine specified in the passed bind expression.
Note this function is meant for convenience and protected usage. Do NOT
blindly pass user input to this function as it uses exec.
:param engine: A python expression that represents the engine to mock.
:param stream: Render all DDL operations to the stream.
"""
# Create a stream if not present.
if stream is None:
stream = six.moves.cStringIO()
# Navigate the stack and find the calling frame that allows the
# expression to execuate.
for frame in inspect.stack()[1:]:
try:
frame = frame[0]
expression = '__target = %s' % engine
six.exec_(expression, frame.f_globals, frame.f_locals)
target = frame.f_locals['__target']
break
except:
pass
else:
raise ValueError('Not a valid python expression', engine)
# Evaluate the expression and get the target engine.
frame.f_locals['__mock'] = create_mock_engine(target, stream)
# Replace the target with our mock.
six.exec_('%s = __mock' % engine, frame.f_globals, frame.f_locals)
# Give control back.
yield stream
# Put the target engine back.
frame.f_locals['__target'] = target
six.exec_('%s = __target' % engine, frame.f_globals, frame.f_locals)
six.exec_('del __target', frame.f_globals, frame.f_locals)
six.exec_('del __mock', frame.f_globals, frame.f_locals)

View File

@@ -0,0 +1,70 @@
import inspect
import six
import sqlalchemy as sa
from .mock import create_mock_engine
def render_expression(expression, bind, stream=None):
"""Generate a SQL expression from the passed python expression.
Only the global variable, `engine`, is available for use in the
expression. Additional local variables may be passed in the context
parameter.
Note this function is meant for convenience and protected usage. Do NOT
blindly pass user input to this function as it uses exec.
:param bind: A SQLAlchemy engine or bind URL.
:param stream: Render all DDL operations to the stream.
"""
# Create a stream if not present.
if stream is None:
stream = six.moves.cStringIO()
engine = create_mock_engine(bind, stream)
# Navigate the stack and find the calling frame that allows the
# expression to execuate.
for frame in inspect.stack()[1:]:
try:
frame = frame[0]
local = dict(frame.f_locals)
local['engine'] = engine
six.exec_(expression, frame.f_globals, local)
break
except:
pass
else:
raise ValueError('Not a valid python expression', engine)
return stream
def render_statement(statement, bind=None):
"""
Generate an SQL expression string with bound parameters rendered inline
for the given SQLAlchemy statement.
:param statement: SQLAlchemy Query object.
:param bind:
Optional SQLAlchemy bind, if None uses the bind of the given query
object.
"""
if isinstance(statement, sa.orm.query.Query):
if bind is None:
bind = statement.session.get_bind(statement._mapper_zero_or_none())
statement = statement.statement
elif bind is None:
bind = statement.bind
stream = six.moves.cStringIO()
engine = create_mock_engine(bind.engine, stream=stream)
engine.execute(statement)
return stream.getvalue()