Fix 3 files with Windows line endings to Unix line endings.

Change-Id: Iadc8e5d195bf998a117da4b7102a8955e238dd4e
This commit is contained in:
David Ripton 2014-02-27 10:32:17 -05:00
parent 21fcdad0f4
commit 6502d4017a
3 changed files with 491 additions and 493 deletions

View File

@ -1,90 +1,90 @@
<?xml version="1.0"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<?python
import pudge
def initialize(t):
g = t.generator
if not hasattr(t, 'title'):
t.title = 'Untitled'
t.doc_title = g.index_document['title']
t.home_url = g.organization_url or g.blog_url or g.trac_url
t.home_title = g.organization
?>
<html xmlns="http://www.w3.org/1999/xhtml"
xmlns:py="http://purl.org/kid/ns#"
py:def="layout">
<head>
<title>${title}</title>
<link rel="stylesheet" type="text/css" href="layout.css"/>
<link py:if="generator.syndication_url"
rel="alternate"
type="application/rss+xml"
title="RSS 2.0" href="${generator.syndication_url}"/>
</head>
<body>
<div id="page">
<h1 class="doc-title"><a href="${home_url}">${home_title}</a></h1>
<div id="navcontainer">
<ul id="navlist">
<li class="pagenav">
<ul>
<li class="page_item">
<a href="index.html"
class="${'index.html'== destfile and 'selected' or ''}"
title="Project Home / Index">${doc_title}</a>
</li>
<li class="page_item">
<a href="module-index.html"
class="${'module-index.html'== destfile and 'selected' or ''}"
title="${doc_title.lower()} package and module reference">Modules</a>
</li>
<?python
trac_url = generator.trac_url
mailing_list_url = generator.mailing_list_url
?>
<li py:if="trac_url">
<a href="${trac_url}"
title="Wiki / Subversion / Roadmap / Bug Tracker"
>Trac</a>
</li>
<li py:if="generator.blog_url">
<a href="${generator.blog_url}">Blog</a>
</li>
<li py:if="mailing_list_url">
<a href="${mailing_list_url}"
class="${mailing_list_url == destfile and 'selected' or ''}"
title="Mailing List">Discuss</a>
</li>
</ul>
</li>
</ul>
</div>
<hr />
<div id="content" py:content="content()"/>
<div id="footer">
<?python license = generator.get_document('doc-license') ?>
<p style="float: left;">
built with
<a href="http://lesscode.org/projects/pudge/"
>pudge/${pudge.__version__}</a><br />
original design by
<a href="http://blog.ratterobert.com/"
>ratter / robert</a><br />
</p>
<p style="float:right;">
evan.rosson (at) gmail.com
</p>
</div>
</div>
</body>
</html>
<?xml version="1.0"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<?python
import pudge
def initialize(t):
g = t.generator
if not hasattr(t, 'title'):
t.title = 'Untitled'
t.doc_title = g.index_document['title']
t.home_url = g.organization_url or g.blog_url or g.trac_url
t.home_title = g.organization
?>
<html xmlns="http://www.w3.org/1999/xhtml"
xmlns:py="http://purl.org/kid/ns#"
py:def="layout">
<head>
<title>${title}</title>
<link rel="stylesheet" type="text/css" href="layout.css"/>
<link py:if="generator.syndication_url"
rel="alternate"
type="application/rss+xml"
title="RSS 2.0" href="${generator.syndication_url}"/>
</head>
<body>
<div id="page">
<h1 class="doc-title"><a href="${home_url}">${home_title}</a></h1>
<div id="navcontainer">
<ul id="navlist">
<li class="pagenav">
<ul>
<li class="page_item">
<a href="index.html"
class="${'index.html'== destfile and 'selected' or ''}"
title="Project Home / Index">${doc_title}</a>
</li>
<li class="page_item">
<a href="module-index.html"
class="${'module-index.html'== destfile and 'selected' or ''}"
title="${doc_title.lower()} package and module reference">Modules</a>
</li>
<?python
trac_url = generator.trac_url
mailing_list_url = generator.mailing_list_url
?>
<li py:if="trac_url">
<a href="${trac_url}"
title="Wiki / Subversion / Roadmap / Bug Tracker"
>Trac</a>
</li>
<li py:if="generator.blog_url">
<a href="${generator.blog_url}">Blog</a>
</li>
<li py:if="mailing_list_url">
<a href="${mailing_list_url}"
class="${mailing_list_url == destfile and 'selected' or ''}"
title="Mailing List">Discuss</a>
</li>
</ul>
</li>
</ul>
</div>
<hr />
<div id="content" py:content="content()"/>
<div id="footer">
<?python license = generator.get_document('doc-license') ?>
<p style="float: left;">
built with
<a href="http://lesscode.org/projects/pudge/"
>pudge/${pudge.__version__}</a><br />
original design by
<a href="http://blog.ratterobert.com/"
>ratter / robert</a><br />
</p>
<p style="float:right;">
evan.rosson (at) gmail.com
</p>
</div>
</div>
</body>
</html>

View File

@ -1,313 +1,313 @@
"""
DB2 database specific implementations of changeset classes.
"""
import logging
from ibm_db_sa import base
from sqlalchemy.schema import (AddConstraint,
CreateIndex,
DropConstraint)
from sqlalchemy.schema import (Index,
PrimaryKeyConstraint,
UniqueConstraint)
from migrate.changeset import ansisql
from migrate.changeset import constraint
from migrate import exceptions
LOG = logging.getLogger(__name__)
IBMDBSchemaGenerator = base.IBM_DBDDLCompiler
def get_server_version_info(dialect):
"""Returns the DB2 server major and minor version as a list of ints."""
return [int(ver_token) for ver_token in dialect.dbms_ver.split('.')[0:2]]
def is_unique_constraint_with_null_columns_supported(dialect):
"""Checks to see if the DB2 version is at least 10.5.
This is needed for checking if unique constraints with null columns
are supported.
"""
return get_server_version_info(dialect) >= [10, 5]
class IBMDBColumnGenerator(IBMDBSchemaGenerator,
ansisql.ANSIColumnGenerator):
def visit_column(self, column):
nullable = True
if not column.nullable:
nullable = False
column.nullable = True
table = self.start_alter_table(column)
self.append("ADD COLUMN ")
self.append(self.get_column_specification(column))
for cons in column.constraints:
self.traverse_single(cons)
if column.default is not None:
self.traverse_single(column.default)
self.execute()
#ALTER TABLE STATEMENTS
if not nullable:
self.start_alter_table(column)
self.append("ALTER COLUMN %s SET NOT NULL" %
self.preparer.format_column(column))
self.execute()
self.append("CALL SYSPROC.ADMIN_CMD('REORG TABLE %s')" %
self.preparer.format_table(table))
self.execute()
# add indexes and unique constraints
if column.index_name:
Index(column.index_name, column).create()
elif column.unique_name:
constraint.UniqueConstraint(column,
name=column.unique_name).create()
# SA bounds FK constraints to table, add manually
for fk in column.foreign_keys:
self.add_foreignkey(fk.constraint)
# add primary key constraint if needed
if column.primary_key_name:
pk = constraint.PrimaryKeyConstraint(
column, name=column.primary_key_name)
pk.create()
self.append("COMMIT")
self.execute()
self.append("CALL SYSPROC.ADMIN_CMD('REORG TABLE %s')" %
self.preparer.format_table(table))
self.execute()
class IBMDBColumnDropper(ansisql.ANSIColumnDropper):
def visit_column(self, column):
"""Drop a column from its table.
:param column: the column object
:type column: :class:`sqlalchemy.Column`
"""
#table = self.start_alter_table(column)
super(IBMDBColumnDropper, self).visit_column(column)
self.append("CALL SYSPROC.ADMIN_CMD('REORG TABLE %s')" %
self.preparer.format_table(column.table))
self.execute()
class IBMDBSchemaChanger(IBMDBSchemaGenerator, ansisql.ANSISchemaChanger):
def visit_table(self, table):
"""Rename a table; #38. Other ops aren't supported."""
self._rename_table(table)
self.append("TO %s" % self.preparer.quote(table.new_name, table.quote))
self.execute()
self.append("COMMIT")
self.execute()
def _rename_table(self, table):
self.append("RENAME TABLE %s " % self.preparer.format_table(table))
def visit_index(self, index):
old_name = self.preparer.quote(self._index_identifier(index.name),
index.quote)
new_name = self.preparer.quote(self._index_identifier(index.new_name),
index.quote)
self.append("RENAME INDEX %s TO %s" % (old_name, new_name))
self.execute()
self.append("COMMIT")
self.execute()
def _run_subvisit(self, delta, func, start_alter=True):
"""Runs visit method based on what needs to be changed on column"""
table = delta.table
if start_alter:
self.start_alter_table(table)
ret = func(table,
self.preparer.quote(delta.current_name, delta.quote),
delta)
self.execute()
self._reorg_table(self.preparer.format_table(delta.table))
def _reorg_table(self, delta):
self.append("CALL SYSPROC.ADMIN_CMD('REORG TABLE %s')" % delta)
self.execute()
def visit_column(self, delta):
keys = delta.keys()
tr = self.connection.begin()
column = delta.result_column.copy()
if 'type' in keys:
try:
self._run_subvisit(delta, self._visit_column_change, False)
except Exception as e:
LOG.warn("Unable to change the column type. Error: %s" % e)
if column.primary_key and 'primary_key' not in keys:
try:
self._run_subvisit(delta, self._visit_primary_key)
except Exception as e:
LOG.warn("Unable to add primary key. Error: %s" % e)
if 'nullable' in keys:
self._run_subvisit(delta, self._visit_column_nullable)
if 'server_default' in keys:
self._run_subvisit(delta, self._visit_column_default)
if 'primary_key' in keys:
self._run_subvisit(delta, self._visit_primary_key)
self._run_subvisit(delta, self._visit_unique_constraint)
if 'name' in keys:
try:
self._run_subvisit(delta, self._visit_column_name, False)
except Exception as e:
LOG.warn("Unable to change column %(name)s. Error: %(error)s" %
{'name': delta.current_name, 'error': e})
self._reorg_table(self.preparer.format_table(delta.table))
self.append("COMMIT")
self.execute()
tr.commit()
def _visit_unique_constraint(self, table, col_name, delta):
# Add primary key to the current column
self.append("ADD CONSTRAINT %s " % col_name)
self.append("UNIQUE (%s)" % col_name)
def _visit_primary_key(self, table, col_name, delta):
# Add primary key to the current column
self.append("ADD PRIMARY KEY (%s)" % col_name)
def _visit_column_name(self, table, col_name, delta):
column = delta.result_column.copy()
# Delete the primary key before renaming the column
if column.primary_key:
try:
self.start_alter_table(table)
self.append("DROP PRIMARY KEY")
self.execute()
except Exception:
LOG.debug("Continue since Primary key does not exist.")
self.start_alter_table(table)
new_name = self.preparer.format_column(delta.result_column)
self.append("RENAME COLUMN %s TO %s" % (col_name, new_name))
if column.primary_key:
# execute the rename before adding primary key back
self.execute()
self.start_alter_table(table)
self.append("ADD PRIMARY KEY (%s)" % new_name)
def _visit_column_nullable(self, table, col_name, delta):
self.append("ALTER COLUMN %s " % col_name)
nullable = delta['nullable']
if nullable:
self.append("DROP NOT NULL")
else:
self.append("SET NOT NULL")
def _visit_column_default(self, table, col_name, delta):
default_text = self.get_column_default_string(delta.result_column)
self.append("ALTER COLUMN %s " % col_name)
if default_text is None:
self.append("DROP DEFAULT")
else:
self.append("SET WITH DEFAULT %s" % default_text)
def _visit_column_change(self, table, col_name, delta):
column = delta.result_column.copy()
# Delete the primary key before
if column.primary_key:
try:
self.start_alter_table(table)
self.append("DROP PRIMARY KEY")
self.execute()
except Exception:
LOG.debug("Continue since Primary key does not exist.")
# Delete the identity before
try:
self.start_alter_table(table)
self.append("ALTER COLUMN %s DROP IDENTITY" % col_name)
self.execute()
except Exception:
LOG.debug("Continue since identity does not exist.")
column.default = None
if not column.table:
column.table = delta.table
self.start_alter_table(table)
self.append("ALTER COLUMN %s " % col_name)
self.append("SET DATA TYPE ")
type_text = self.dialect.type_compiler.process(
delta.result_column.type)
self.append(type_text)
class IBMDBConstraintGenerator(ansisql.ANSIConstraintGenerator):
def _visit_constraint(self, constraint):
constraint.name = self.get_constraint_name(constraint)
if (isinstance(constraint, UniqueConstraint) and
is_unique_constraint_with_null_columns_supported(
self.dialect)):
for column in constraint.columns._all_cols:
if column.nullable:
constraint.exclude_nulls = True
break
if getattr(constraint, 'exclude_nulls', None):
index = Index(constraint.name,
*(column for column in constraint.columns._all_cols),
unique=True)
sql = self.process(CreateIndex(index))
sql += ' EXCLUDE NULL KEYS'
else:
sql = self.process(AddConstraint(constraint))
self.append(sql)
self.execute()
class IBMDBConstraintDropper(ansisql.ANSIConstraintDropper,
ansisql.ANSIConstraintCommon):
def _visit_constraint(self, constraint):
constraint.name = self.get_constraint_name(constraint)
if (isinstance(constraint, UniqueConstraint) and
is_unique_constraint_with_null_columns_supported(
self.dialect)):
for column in constraint.columns._all_cols:
if column.nullable:
constraint.exclude_nulls = True
break
if getattr(constraint, 'exclude_nulls', None):
index_name = self.preparer.quote(
self._index_identifier(constraint.name), constraint.quote)
sql = 'DROP INDEX %s ' % index_name
else:
sql = self.process(DropConstraint(constraint,
cascade=constraint.cascade))
self.append(sql)
self.execute()
def visit_migrate_primary_key_constraint(self, constraint):
self.start_alter_table(constraint.table)
self.append("DROP PRIMARY KEY")
self.execute()
class IBMDBDialect(ansisql.ANSIDialect):
columngenerator = IBMDBColumnGenerator
columndropper = IBMDBColumnDropper
schemachanger = IBMDBSchemaChanger
constraintgenerator = IBMDBConstraintGenerator
constraintdropper = IBMDBConstraintDropper
"""
DB2 database specific implementations of changeset classes.
"""
import logging
from ibm_db_sa import base
from sqlalchemy.schema import (AddConstraint,
CreateIndex,
DropConstraint)
from sqlalchemy.schema import (Index,
PrimaryKeyConstraint,
UniqueConstraint)
from migrate.changeset import ansisql
from migrate.changeset import constraint
from migrate import exceptions
LOG = logging.getLogger(__name__)
IBMDBSchemaGenerator = base.IBM_DBDDLCompiler
def get_server_version_info(dialect):
"""Returns the DB2 server major and minor version as a list of ints."""
return [int(ver_token) for ver_token in dialect.dbms_ver.split('.')[0:2]]
def is_unique_constraint_with_null_columns_supported(dialect):
"""Checks to see if the DB2 version is at least 10.5.
This is needed for checking if unique constraints with null columns
are supported.
"""
return get_server_version_info(dialect) >= [10, 5]
class IBMDBColumnGenerator(IBMDBSchemaGenerator,
ansisql.ANSIColumnGenerator):
def visit_column(self, column):
nullable = True
if not column.nullable:
nullable = False
column.nullable = True
table = self.start_alter_table(column)
self.append("ADD COLUMN ")
self.append(self.get_column_specification(column))
for cons in column.constraints:
self.traverse_single(cons)
if column.default is not None:
self.traverse_single(column.default)
self.execute()
#ALTER TABLE STATEMENTS
if not nullable:
self.start_alter_table(column)
self.append("ALTER COLUMN %s SET NOT NULL" %
self.preparer.format_column(column))
self.execute()
self.append("CALL SYSPROC.ADMIN_CMD('REORG TABLE %s')" %
self.preparer.format_table(table))
self.execute()
# add indexes and unique constraints
if column.index_name:
Index(column.index_name, column).create()
elif column.unique_name:
constraint.UniqueConstraint(column,
name=column.unique_name).create()
# SA bounds FK constraints to table, add manually
for fk in column.foreign_keys:
self.add_foreignkey(fk.constraint)
# add primary key constraint if needed
if column.primary_key_name:
pk = constraint.PrimaryKeyConstraint(
column, name=column.primary_key_name)
pk.create()
self.append("COMMIT")
self.execute()
self.append("CALL SYSPROC.ADMIN_CMD('REORG TABLE %s')" %
self.preparer.format_table(table))
self.execute()
class IBMDBColumnDropper(ansisql.ANSIColumnDropper):
def visit_column(self, column):
"""Drop a column from its table.
:param column: the column object
:type column: :class:`sqlalchemy.Column`
"""
#table = self.start_alter_table(column)
super(IBMDBColumnDropper, self).visit_column(column)
self.append("CALL SYSPROC.ADMIN_CMD('REORG TABLE %s')" %
self.preparer.format_table(column.table))
self.execute()
class IBMDBSchemaChanger(IBMDBSchemaGenerator, ansisql.ANSISchemaChanger):
def visit_table(self, table):
"""Rename a table; #38. Other ops aren't supported."""
self._rename_table(table)
self.append("TO %s" % self.preparer.quote(table.new_name, table.quote))
self.execute()
self.append("COMMIT")
self.execute()
def _rename_table(self, table):
self.append("RENAME TABLE %s " % self.preparer.format_table(table))
def visit_index(self, index):
old_name = self.preparer.quote(self._index_identifier(index.name),
index.quote)
new_name = self.preparer.quote(self._index_identifier(index.new_name),
index.quote)
self.append("RENAME INDEX %s TO %s" % (old_name, new_name))
self.execute()
self.append("COMMIT")
self.execute()
def _run_subvisit(self, delta, func, start_alter=True):
"""Runs visit method based on what needs to be changed on column"""
table = delta.table
if start_alter:
self.start_alter_table(table)
ret = func(table,
self.preparer.quote(delta.current_name, delta.quote),
delta)
self.execute()
self._reorg_table(self.preparer.format_table(delta.table))
def _reorg_table(self, delta):
self.append("CALL SYSPROC.ADMIN_CMD('REORG TABLE %s')" % delta)
self.execute()
def visit_column(self, delta):
keys = delta.keys()
tr = self.connection.begin()
column = delta.result_column.copy()
if 'type' in keys:
try:
self._run_subvisit(delta, self._visit_column_change, False)
except Exception as e:
LOG.warn("Unable to change the column type. Error: %s" % e)
if column.primary_key and 'primary_key' not in keys:
try:
self._run_subvisit(delta, self._visit_primary_key)
except Exception as e:
LOG.warn("Unable to add primary key. Error: %s" % e)
if 'nullable' in keys:
self._run_subvisit(delta, self._visit_column_nullable)
if 'server_default' in keys:
self._run_subvisit(delta, self._visit_column_default)
if 'primary_key' in keys:
self._run_subvisit(delta, self._visit_primary_key)
self._run_subvisit(delta, self._visit_unique_constraint)
if 'name' in keys:
try:
self._run_subvisit(delta, self._visit_column_name, False)
except Exception as e:
LOG.warn("Unable to change column %(name)s. Error: %(error)s" %
{'name': delta.current_name, 'error': e})
self._reorg_table(self.preparer.format_table(delta.table))
self.append("COMMIT")
self.execute()
tr.commit()
def _visit_unique_constraint(self, table, col_name, delta):
# Add primary key to the current column
self.append("ADD CONSTRAINT %s " % col_name)
self.append("UNIQUE (%s)" % col_name)
def _visit_primary_key(self, table, col_name, delta):
# Add primary key to the current column
self.append("ADD PRIMARY KEY (%s)" % col_name)
def _visit_column_name(self, table, col_name, delta):
column = delta.result_column.copy()
# Delete the primary key before renaming the column
if column.primary_key:
try:
self.start_alter_table(table)
self.append("DROP PRIMARY KEY")
self.execute()
except Exception:
LOG.debug("Continue since Primary key does not exist.")
self.start_alter_table(table)
new_name = self.preparer.format_column(delta.result_column)
self.append("RENAME COLUMN %s TO %s" % (col_name, new_name))
if column.primary_key:
# execute the rename before adding primary key back
self.execute()
self.start_alter_table(table)
self.append("ADD PRIMARY KEY (%s)" % new_name)
def _visit_column_nullable(self, table, col_name, delta):
self.append("ALTER COLUMN %s " % col_name)
nullable = delta['nullable']
if nullable:
self.append("DROP NOT NULL")
else:
self.append("SET NOT NULL")
def _visit_column_default(self, table, col_name, delta):
default_text = self.get_column_default_string(delta.result_column)
self.append("ALTER COLUMN %s " % col_name)
if default_text is None:
self.append("DROP DEFAULT")
else:
self.append("SET WITH DEFAULT %s" % default_text)
def _visit_column_change(self, table, col_name, delta):
column = delta.result_column.copy()
# Delete the primary key before
if column.primary_key:
try:
self.start_alter_table(table)
self.append("DROP PRIMARY KEY")
self.execute()
except Exception:
LOG.debug("Continue since Primary key does not exist.")
# Delete the identity before
try:
self.start_alter_table(table)
self.append("ALTER COLUMN %s DROP IDENTITY" % col_name)
self.execute()
except Exception:
LOG.debug("Continue since identity does not exist.")
column.default = None
if not column.table:
column.table = delta.table
self.start_alter_table(table)
self.append("ALTER COLUMN %s " % col_name)
self.append("SET DATA TYPE ")
type_text = self.dialect.type_compiler.process(
delta.result_column.type)
self.append(type_text)
class IBMDBConstraintGenerator(ansisql.ANSIConstraintGenerator):
def _visit_constraint(self, constraint):
constraint.name = self.get_constraint_name(constraint)
if (isinstance(constraint, UniqueConstraint) and
is_unique_constraint_with_null_columns_supported(
self.dialect)):
for column in constraint.columns._all_cols:
if column.nullable:
constraint.exclude_nulls = True
break
if getattr(constraint, 'exclude_nulls', None):
index = Index(constraint.name,
*(column for column in constraint.columns._all_cols),
unique=True)
sql = self.process(CreateIndex(index))
sql += ' EXCLUDE NULL KEYS'
else:
sql = self.process(AddConstraint(constraint))
self.append(sql)
self.execute()
class IBMDBConstraintDropper(ansisql.ANSIConstraintDropper,
ansisql.ANSIConstraintCommon):
def _visit_constraint(self, constraint):
constraint.name = self.get_constraint_name(constraint)
if (isinstance(constraint, UniqueConstraint) and
is_unique_constraint_with_null_columns_supported(
self.dialect)):
for column in constraint.columns._all_cols:
if column.nullable:
constraint.exclude_nulls = True
break
if getattr(constraint, 'exclude_nulls', None):
index_name = self.preparer.quote(
self._index_identifier(constraint.name), constraint.quote)
sql = 'DROP INDEX %s ' % index_name
else:
sql = self.process(DropConstraint(constraint,
cascade=constraint.cascade))
self.append(sql)
self.execute()
def visit_migrate_primary_key_constraint(self, constraint):
self.start_alter_table(constraint.table)
self.append("DROP PRIMARY KEY")
self.execute()
class IBMDBDialect(ansisql.ANSIDialect):
columngenerator = IBMDBColumnGenerator
columndropper = IBMDBColumnDropper
schemachanger = IBMDBSchemaChanger
constraintgenerator = IBMDBConstraintGenerator
constraintdropper = IBMDBConstraintDropper

View File

@ -1,90 +1,88 @@
# lifted from Python 2.6, so we can use it in Python 2.5
import sys
class WarningMessage(object):
"""Holds the result of a single showwarning() call."""
_WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
"line")
def __init__(self, message, category, filename, lineno, file=None,
line=None):
local_values = locals()
for attr in self._WARNING_DETAILS:
setattr(self, attr, local_values[attr])
if category:
self._category_name = category.__name__
else:
self._category_name = None
def __str__(self):
return ("{message : %r, category : %r, filename : %r, lineno : %s, "
"line : %r}" % (self.message, self._category_name,
self.filename, self.lineno, self.line))
class catch_warnings(object):
"""A context manager that copies and restores the warnings filter upon
exiting the context.
The 'record' argument specifies whether warnings should be captured by a
custom implementation of warnings.showwarning() and be appended to a list
returned by the context manager. Otherwise None is returned by the context
manager. The objects appended to the list are arguments whose attributes
mirror the arguments to showwarning().
The 'module' argument is to specify an alternative module to the module
named 'warnings' and imported under that name. This argument is only useful
when testing the warnings module itself.
"""
def __init__(self, record=False, module=None):
"""Specify whether to record warnings and if an alternative module
should be used other than sys.modules['warnings'].
For compatibility with Python 3.0, please consider all arguments to be
keyword-only.
"""
self._record = record
if module is None:
self._module = sys.modules['warnings']
else:
self._module = module
self._entered = False
def __repr__(self):
args = []
if self._record:
args.append("record=True")
if self._module is not sys.modules['warnings']:
args.append("module=%r" % self._module)
name = type(self).__name__
return "%s(%s)" % (name, ", ".join(args))
def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._showwarning = self._module.showwarning
if self._record:
log = []
def showwarning(*args, **kwargs):
log.append(WarningMessage(*args, **kwargs))
self._module.showwarning = showwarning
return log
else:
return None
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
self._module.filters = self._filters
self._module.showwarning = self._showwarning
# lifted from Python 2.6, so we can use it in Python 2.5
import sys
class WarningMessage(object):
"""Holds the result of a single showwarning() call."""
_WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
"line")
def __init__(self, message, category, filename, lineno, file=None,
line=None):
local_values = locals()
for attr in self._WARNING_DETAILS:
setattr(self, attr, local_values[attr])
if category:
self._category_name = category.__name__
else:
self._category_name = None
def __str__(self):
return ("{message : %r, category : %r, filename : %r, lineno : %s, "
"line : %r}" % (self.message, self._category_name,
self.filename, self.lineno, self.line))
class catch_warnings(object):
"""A context manager that copies and restores the warnings filter upon
exiting the context.
The 'record' argument specifies whether warnings should be captured by a
custom implementation of warnings.showwarning() and be appended to a list
returned by the context manager. Otherwise None is returned by the context
manager. The objects appended to the list are arguments whose attributes
mirror the arguments to showwarning().
The 'module' argument is to specify an alternative module to the module
named 'warnings' and imported under that name. This argument is only useful
when testing the warnings module itself.
"""
def __init__(self, record=False, module=None):
"""Specify whether to record warnings and if an alternative module
should be used other than sys.modules['warnings'].
For compatibility with Python 3.0, please consider all arguments to be
keyword-only.
"""
self._record = record
if module is None:
self._module = sys.modules['warnings']
else:
self._module = module
self._entered = False
def __repr__(self):
args = []
if self._record:
args.append("record=True")
if self._module is not sys.modules['warnings']:
args.append("module=%r" % self._module)
name = type(self).__name__
return "%s(%s)" % (name, ", ".join(args))
def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._showwarning = self._module.showwarning
if self._record:
log = []
def showwarning(*args, **kwargs):
log.append(WarningMessage(*args, **kwargs))
self._module.showwarning = showwarning
return log
else:
return None
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
self._module.filters = self._filters
self._module.showwarning = self._showwarning