From c1a725eccbbdbc8c6b9ff6ea024891b9517a273f Mon Sep 17 00:00:00 2001 From: Konsta Vesterinen Date: Wed, 30 Oct 2013 14:22:38 +0200 Subject: [PATCH] Refactored functions module --- sqlalchemy_utils/functions/__init__.py | 185 +------------------------ sqlalchemy_utils/functions/mock.py | 108 +++++++++++++++ sqlalchemy_utils/functions/render.py | 70 ++++++++++ 3 files changed, 184 insertions(+), 179 deletions(-) create mode 100644 sqlalchemy_utils/functions/mock.py create mode 100644 sqlalchemy_utils/functions/render.py diff --git a/sqlalchemy_utils/functions/__init__.py b/sqlalchemy_utils/functions/__init__.py index 998dc7d..1c58c1b 100644 --- a/sqlalchemy_utils/functions/__init__.py +++ b/sqlalchemy_utils/functions/__init__.py @@ -1,22 +1,21 @@ from collections import defaultdict -import six -import re -import datetime -import contextlib -import inspect import sqlalchemy as sa -from sqlalchemy.orm.query import Query from sqlalchemy.schema import MetaData, Table, ForeignKeyConstraint -from six.moves import cStringIO from .batch_fetch import batch_fetch, with_backrefs, CompositePath from .defer_except import defer_except +from .mock import create_mock_engine, mock_engine +from .render import render_expression, render_statement from .sort_query import sort_query, QuerySorterException __all__ = ( batch_fetch, + create_mock_engine, defer_except, + mock_engine, sort_query, + render_expression, + render_statement, with_backrefs, CompositePath, QuerySorterException @@ -232,175 +231,3 @@ def naturally_equivalent(obj, obj2): if not (getattr(obj, prop.key) == getattr(obj2, prop.key)): return False return True - - -def create_mock_engine(bind, stream=None): - """Create a mock SQLAlchemy engine from the passed engine or bind URL. - - :param bind: A SQLAlchemy engine or bind URL to mock. - :param stream: Render all DDL operations to the stream. - """ - - if not isinstance(bind, six.string_types): - bind_url = str(bind.url) - - else: - bind_url = bind - - if stream is not None: - - def dump(sql, *args, **kwargs): - - class Compiler(type(sql._compiler(engine.dialect))): - - def visit_bindparam(self, bindparam, *args, **kwargs): - return self.render_literal_value( - bindparam.value, bindparam.type) - - def render_literal_value(self, value, type_): - if isinstance(value, six.integer_types): - return str(value) - - elif isinstance(value, (datetime.date, datetime.datetime)): - return "'%s'" % value - - return super(Compiler, self).render_literal_value( - value, type_) - - text = str(Compiler(engine.dialect, sql).process(sql)) - text = re.sub(r'\n+', '\n', text) - text = text.strip('\n').strip() - - stream.write('\n%s;' % text) - - else: - - dump = lambda *a, **kw: None - - engine = sa.create_engine(bind_url, strategy='mock', executor=dump) - return engine - - -@contextlib.contextmanager -def mock_engine(engine, stream=None): - """Mocks out the engine specified in the passed bind expression. - - Note this function is meant for convenience and protected usage. Do NOT - blindly pass user input to this function as it uses exec. - - :param engine: A python expression that represents the engine to mock. - :param stream: Render all DDL operations to the stream. - """ - - # Create a stream if not present. - - if stream is None: - stream = cStringIO() - - # Navigate the stack and find the calling frame that allows the - # expression to execuate. - - for frame in inspect.stack()[1:]: - - try: - frame = frame[0] - expression = '__target = %s' % engine - six.exec_(expression, frame.f_globals, frame.f_locals) - target = frame.f_locals['__target'] - break - - except: - pass - - else: - - raise ValueError('Not a valid python expression', engine) - - # Evaluate the expression and get the target engine. - - frame.f_locals['__mock'] = create_mock_engine(target, stream) - - # Replace the target with our mock. - - six.exec_('%s = __mock' % engine, frame.f_globals, frame.f_locals) - - # Give control back. - - yield stream - - # Put the target engine back. - - frame.f_locals['__target'] = target - six.exec_('%s = __target' % engine, frame.f_globals, frame.f_locals) - six.exec_('del __target', frame.f_globals, frame.f_locals) - six.exec_('del __mock', frame.f_globals, frame.f_locals) - - -def render_expression(expression, bind, stream=None): - """Generate a SQL expression from the passed python expression. - - Only the global variable, `engine`, is available for use in the - expression. Additional local variables may be passed in the context - parameter. - - Note this function is meant for convenience and protected usage. Do NOT - blindly pass user input to this function as it uses exec. - - :param bind: A SQLAlchemy engine or bind URL. - :param stream: Render all DDL operations to the stream. - """ - - # Create a stream if not present. - - if stream is None: - stream = cStringIO() - - engine = create_mock_engine(bind, stream) - - # Navigate the stack and find the calling frame that allows the - # expression to execuate. - - for frame in inspect.stack()[1:]: - - try: - frame = frame[0] - local = dict(frame.f_locals) - local['engine'] = engine - six.exec_(expression, frame.f_globals, local) - break - - except: - pass - - else: - - raise ValueError('Not a valid python expression', engine) - - return stream - - -def render_statement(statement, bind=None): - """ - Generate an SQL expression string with bound parameters rendered inline - for the given SQLAlchemy statement. - - :param statement: SQLAlchemy Query object. - :param bind: - Optional SQLAlchemy bind, if None uses the bind of the given query - object. - """ - - if isinstance(statement, Query): - if bind is None: - bind = statement.session.get_bind(statement._mapper_zero_or_none()) - - statement = statement.statement - - elif bind is None: - bind = statement.bind - - stream = cStringIO() - engine = create_mock_engine(bind.engine, stream=stream) - engine.execute(statement) - - return stream.getvalue() diff --git a/sqlalchemy_utils/functions/mock.py b/sqlalchemy_utils/functions/mock.py new file mode 100644 index 0000000..812a42c --- /dev/null +++ b/sqlalchemy_utils/functions/mock.py @@ -0,0 +1,108 @@ +import contextlib +import datetime +import inspect +import re +import six +import sqlalchemy as sa + + +def create_mock_engine(bind, stream=None): + """Create a mock SQLAlchemy engine from the passed engine or bind URL. + + :param bind: A SQLAlchemy engine or bind URL to mock. + :param stream: Render all DDL operations to the stream. + """ + + if not isinstance(bind, six.string_types): + bind_url = str(bind.url) + + else: + bind_url = bind + + if stream is not None: + + def dump(sql, *args, **kwargs): + + class Compiler(type(sql._compiler(engine.dialect))): + + def visit_bindparam(self, bindparam, *args, **kwargs): + return self.render_literal_value( + bindparam.value, bindparam.type) + + def render_literal_value(self, value, type_): + if isinstance(value, six.integer_types): + return str(value) + + elif isinstance(value, (datetime.date, datetime.datetime)): + return "'%s'" % value + + return super(Compiler, self).render_literal_value( + value, type_) + + text = str(Compiler(engine.dialect, sql).process(sql)) + text = re.sub(r'\n+', '\n', text) + text = text.strip('\n').strip() + + stream.write('\n%s;' % text) + + else: + + dump = lambda *a, **kw: None + + engine = sa.create_engine(bind_url, strategy='mock', executor=dump) + return engine + + +@contextlib.contextmanager +def mock_engine(engine, stream=None): + """Mocks out the engine specified in the passed bind expression. + + Note this function is meant for convenience and protected usage. Do NOT + blindly pass user input to this function as it uses exec. + + :param engine: A python expression that represents the engine to mock. + :param stream: Render all DDL operations to the stream. + """ + + # Create a stream if not present. + + if stream is None: + stream = six.moves.cStringIO() + + # Navigate the stack and find the calling frame that allows the + # expression to execuate. + + for frame in inspect.stack()[1:]: + + try: + frame = frame[0] + expression = '__target = %s' % engine + six.exec_(expression, frame.f_globals, frame.f_locals) + target = frame.f_locals['__target'] + break + + except: + pass + + else: + + raise ValueError('Not a valid python expression', engine) + + # Evaluate the expression and get the target engine. + + frame.f_locals['__mock'] = create_mock_engine(target, stream) + + # Replace the target with our mock. + + six.exec_('%s = __mock' % engine, frame.f_globals, frame.f_locals) + + # Give control back. + + yield stream + + # Put the target engine back. + + frame.f_locals['__target'] = target + six.exec_('%s = __target' % engine, frame.f_globals, frame.f_locals) + six.exec_('del __target', frame.f_globals, frame.f_locals) + six.exec_('del __mock', frame.f_globals, frame.f_locals) diff --git a/sqlalchemy_utils/functions/render.py b/sqlalchemy_utils/functions/render.py new file mode 100644 index 0000000..44010c5 --- /dev/null +++ b/sqlalchemy_utils/functions/render.py @@ -0,0 +1,70 @@ +import inspect +import six +import sqlalchemy as sa +from .mock import create_mock_engine + + +def render_expression(expression, bind, stream=None): + """Generate a SQL expression from the passed python expression. + + Only the global variable, `engine`, is available for use in the + expression. Additional local variables may be passed in the context + parameter. + + Note this function is meant for convenience and protected usage. Do NOT + blindly pass user input to this function as it uses exec. + + :param bind: A SQLAlchemy engine or bind URL. + :param stream: Render all DDL operations to the stream. + """ + + # Create a stream if not present. + + if stream is None: + stream = six.moves.cStringIO() + + engine = create_mock_engine(bind, stream) + + # Navigate the stack and find the calling frame that allows the + # expression to execuate. + + for frame in inspect.stack()[1:]: + try: + frame = frame[0] + local = dict(frame.f_locals) + local['engine'] = engine + six.exec_(expression, frame.f_globals, local) + break + except: + pass + else: + raise ValueError('Not a valid python expression', engine) + + return stream + + +def render_statement(statement, bind=None): + """ + Generate an SQL expression string with bound parameters rendered inline + for the given SQLAlchemy statement. + + :param statement: SQLAlchemy Query object. + :param bind: + Optional SQLAlchemy bind, if None uses the bind of the given query + object. + """ + + if isinstance(statement, sa.orm.query.Query): + if bind is None: + bind = statement.session.get_bind(statement._mapper_zero_or_none()) + + statement = statement.statement + + elif bind is None: + bind = statement.bind + + stream = six.moves.cStringIO() + engine = create_mock_engine(bind.engine, stream=stream) + engine.execute(statement) + + return stream.getvalue()