Merge "Update Oslo"

This commit is contained in:
Jenkins 2013-10-15 09:29:00 +00:00 committed by Gerrit Code Review
commit c4944bd721
32 changed files with 1656 additions and 116 deletions

View File

@ -40,13 +40,15 @@ class RequestContext(object):
"""
def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
read_only=False, show_deleted=False, request_id=None):
read_only=False, show_deleted=False, request_id=None,
instance_uuid=None):
self.auth_token = auth_token
self.user = user
self.tenant = tenant
self.is_admin = is_admin
self.read_only = read_only
self.show_deleted = show_deleted
self.instance_uuid = instance_uuid
if not request_id:
request_id = generate_request_id()
self.request_id = request_id
@ -58,7 +60,8 @@ class RequestContext(object):
'read_only': self.read_only,
'show_deleted': self.show_deleted,
'auth_token': self.auth_token,
'request_id': self.request_id}
'request_id': self.request_id,
'instance_uuid': self.instance_uuid}
def get_admin_context(show_deleted=False):

View File

@ -43,3 +43,9 @@ class DBDeadlock(DBError):
class DBInvalidUnicodeParameter(Exception):
message = _("Invalid Parameter: "
"Unicode is not supported by the current database.")
class DbMigrationError(DBError):
"""Wraps migration specific exception."""
def __init__(self, message=None):
super(DbMigrationError, self).__init__(str(message))

View File

@ -38,12 +38,53 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
import distutils.version as dist_version
import os
import re
import migrate
from migrate.changeset import ansisql
from migrate.changeset.databases import sqlite
from migrate.versioning import util as migrate_util
import sqlalchemy
from sqlalchemy.schema import UniqueConstraint
from ceilometer.openstack.common.db import exception
from ceilometer.openstack.common.db.sqlalchemy import session as db_session
from ceilometer.openstack.common.gettextutils import _ # noqa
@migrate_util.decorator
def patched_with_engine(f, *a, **kw):
url = a[0]
engine = migrate_util.construct_engine(url, **kw)
try:
kw['engine'] = engine
return f(*a, **kw)
finally:
if isinstance(engine, migrate_util.Engine) and engine is not url:
migrate_util.log.debug('Disposing SQLAlchemy engine %s', engine)
engine.dispose()
# TODO(jkoelker) When migrate 0.7.3 is released and nova depends
# on that version or higher, this can be removed
MIN_PKG_VERSION = dist_version.StrictVersion('0.7.3')
if (not hasattr(migrate, '__version__') or
dist_version.StrictVersion(migrate.__version__) < MIN_PKG_VERSION):
migrate_util.with_engine = patched_with_engine
# NOTE(jkoelker) Delay importing migrate until we are patched
from migrate import exceptions as versioning_exceptions
from migrate.versioning import api as versioning_api
from migrate.versioning.repository import Repository
_REPOSITORY = None
get_engine = db_session.get_engine
def _get_unique_constraints(self, table):
"""Retrieve information about existing unique constraints of the table
@ -157,3 +198,81 @@ def patch_migrate():
_visit_migrate_unique_constraint
constraint_cls.__bases__ = (ansisql.ANSIColumnDropper,
sqlite.SQLiteConstraintGenerator)
def db_sync(abs_path, version=None, init_version=0):
"""Upgrade or downgrade a database.
Function runs the upgrade() or downgrade() functions in change scripts.
:param abs_path: Absolute path to migrate repository.
:param version: Database will upgrade/downgrade until this version.
If None - database will update to the latest
available version.
:param init_version: Initial database version
"""
if version is not None:
try:
version = int(version)
except ValueError:
raise exception.DbMigrationError(
message=_("version should be an integer"))
current_version = db_version(abs_path, init_version)
repository = _find_migrate_repo(abs_path)
if version is None or version > current_version:
return versioning_api.upgrade(get_engine(), repository, version)
else:
return versioning_api.downgrade(get_engine(), repository,
version)
def db_version(abs_path, init_version):
"""Show the current version of the repository.
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
try:
return versioning_api.db_version(get_engine(), repository)
except versioning_exceptions.DatabaseNotControlledError:
meta = sqlalchemy.MetaData()
engine = get_engine()
meta.reflect(bind=engine)
tables = meta.tables
if len(tables) == 0:
db_version_control(abs_path, init_version)
return versioning_api.db_version(get_engine(), repository)
else:
# Some pre-Essex DB's may not be version controlled.
# Require them to upgrade using Essex first.
raise exception.DbMigrationError(
message=_("Upgrade DB using Essex release first."))
def db_version_control(abs_path, version=None):
"""Mark a database as under this repository's version control.
Once a database is under version control, schema changes should
only be done via change scripts in this repository.
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
versioning_api.version_control(get_engine(), repository, version)
return version
def _find_migrate_repo(abs_path):
"""Get the project's change script repository
:param abs_path: Absolute path to migrate repository
"""
global _REPOSITORY
if not os.path.exists(abs_path):
raise exception.DbMigrationError("Path %s not found" % abs_path)
if _REPOSITORY is None:
_REPOSITORY = Repository(abs_path)
return _REPOSITORY

View File

@ -61,13 +61,15 @@ class ModelBase(object):
def get(self, key, default=None):
return getattr(self, key, default)
def _get_extra_keys(self):
return []
def __iter__(self):
columns = dict(object_mapper(self).columns).keys()
# NOTE(russellb): Allow models to specify other keys that can be looked
# up, beyond the actual db columns. An example would be the 'name'
# property for an Instance.
if hasattr(self, '_extra_keys'):
columns.extend(self._extra_keys())
columns.extend(self._get_extra_keys())
self._i = iter(columns)
return self

View File

@ -241,11 +241,11 @@ Efficient use of soft deletes:
# This will produce count(bar_refs) db requests.
"""
import functools
import os.path
import re
import time
from eventlet import greenthread
from oslo.config import cfg
import six
from sqlalchemy import exc as sqla_exc
@ -279,13 +279,13 @@ database_opts = [
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE')],
secret=True),
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('slave_connection',
default='',
help='The SQLAlchemy connection string used to connect to the '
'slave database',
secret=True),
'slave database'),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
@ -478,6 +478,11 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
if engine_name not in ["mysql", "sqlite", "postgresql"]:
return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message)
if not m:
return
@ -510,6 +515,11 @@ def _raise_if_deadlock_error(operational_error, engine_name):
re = _DEADLOCK_RE_DB.get(engine_name)
if re is None:
return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
m = re.match(operational_error.message)
if not m:
return
@ -517,6 +527,7 @@ def _raise_if_deadlock_error(operational_error, engine_name):
def _wrap_db_error(f):
@functools.wraps(f)
def _wrap(*args, **kwargs):
try:
return f(*args, **kwargs)
@ -541,7 +552,6 @@ def _wrap_db_error(f):
except Exception as e:
LOG.exception(_('DB exception wrapped.'))
raise exception.DBError(e)
_wrap.func_name = f.func_name
return _wrap
@ -581,14 +591,16 @@ def _add_regexp_listener(dbapi_con, con_record):
dbapi_con.create_function('regexp', 2, regexp)
def _greenthread_yield(dbapi_con, con_record):
def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch.
"""
greenthread.sleep(0)
time.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
@ -657,7 +669,7 @@ def create_engine(sql_connection, sqlite_fk=False):
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener)

View File

@ -0,0 +1,289 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010-2011 OpenStack Foundation
# Copyright 2012-2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import commands
import ConfigParser
import os
import urlparse
import sqlalchemy
import sqlalchemy.exc
from ceilometer.openstack.common import lockutils
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import test
LOG = logging.getLogger(__name__)
def _get_connect_string(backend, user, passwd, database):
"""Get database connection
Try to get a connection with a very specific set of values, if we get
these then we'll run the tests, otherwise they are skipped
"""
if backend == "postgres":
backend = "postgresql+psycopg2"
elif backend == "mysql":
backend = "mysql+mysqldb"
else:
raise Exception("Unrecognized backend: '%s'" % backend)
return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
% {'backend': backend, 'user': user, 'passwd': passwd,
'database': database})
def _is_backend_avail(backend, user, passwd, database):
try:
connect_uri = _get_connect_string(backend, user, passwd, database)
engine = sqlalchemy.create_engine(connect_uri)
connection = engine.connect()
except Exception:
# intentionally catch all to handle exceptions even if we don't
# have any backend code loaded.
return False
else:
connection.close()
engine.dispose()
return True
def _have_mysql(user, passwd, database):
present = os.environ.get('TEST_MYSQL_PRESENT')
if present is None:
return _is_backend_avail('mysql', user, passwd, database)
return present.lower() in ('', 'true')
def _have_postgresql(user, passwd, database):
present = os.environ.get('TEST_POSTGRESQL_PRESENT')
if present is None:
return _is_backend_avail('postgres', user, passwd, database)
return present.lower() in ('', 'true')
def get_db_connection_info(conn_pieces):
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
password = auth_pieces[1].strip()
return (user, password, database, host)
class BaseMigrationTestCase(test.BaseTestCase):
"""Base class fort testing of migration utils."""
def __init__(self, *args, **kwargs):
super(BaseMigrationTestCase, self).__init__(*args, **kwargs)
self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
'test_migrations.conf')
# Test machines can set the TEST_MIGRATIONS_CONF variable
# to override the location of the config file for migration testing
self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF',
self.DEFAULT_CONFIG_FILE)
self.test_databases = {}
self.migration_api = None
def setUp(self):
super(BaseMigrationTestCase, self).setUp()
# Load test databases from the config file. Only do this
# once. No need to re-run this on each test...
LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH)
if os.path.exists(self.CONFIG_FILE_PATH):
cp = ConfigParser.RawConfigParser()
try:
cp.read(self.CONFIG_FILE_PATH)
defaults = cp.defaults()
for key, value in defaults.items():
self.test_databases[key] = value
except ConfigParser.ParsingError as e:
self.fail("Failed to read test_migrations.conf config "
"file. Got error: %s" % e)
else:
self.fail("Failed to find test_migrations.conf config "
"file.")
self.engines = {}
for key, value in self.test_databases.items():
self.engines[key] = sqlalchemy.create_engine(value)
# We start each test case with a completely blank slate.
self._reset_databases()
def tearDown(self):
# We destroy the test data store between each test case,
# and recreate it, which ensures that we have no side-effects
# from the tests
self._reset_databases()
super(BaseMigrationTestCase, self).tearDown()
def execute_cmd(self, cmd=None):
status, output = commands.getstatusoutput(cmd)
LOG.debug(output)
self.assertEqual(0, status,
"Failed to run: %s\n%s" % (cmd, output))
@lockutils.synchronized('pgadmin', 'tests-', external=True)
def _reset_pg(self, conn_pieces):
(user, password, database, host) = get_db_connection_info(conn_pieces)
os.environ['PGPASSWORD'] = password
os.environ['PGUSER'] = user
# note(boris-42): We must create and drop database, we can't
# drop database which we have connected to, so for such
# operations there is a special database template1.
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
" '%(sql)s' -d template1")
sql = ("drop database if exists %s;") % database
droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(droptable)
sql = ("create database %s;") % database
createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(createtable)
os.unsetenv('PGPASSWORD')
os.unsetenv('PGUSER')
def _reset_databases(self):
for key, engine in self.engines.items():
conn_string = self.test_databases[key]
conn_pieces = urlparse.urlparse(conn_string)
engine.dispose()
if conn_string.startswith('sqlite'):
# We can just delete the SQLite database, which is
# the easiest and cleanest solution
db_path = conn_pieces.path.strip('/')
if os.path.exists(db_path):
os.unlink(db_path)
# No need to recreate the SQLite DB. SQLite will
# create it for us if it's not there...
elif conn_string.startswith('mysql'):
# We can execute the MySQL client to destroy and re-create
# the MYSQL database, which is easier and less error-prone
# than using SQLAlchemy to do this via MetaData...trust me.
(user, password, database, host) = \
get_db_connection_info(conn_pieces)
sql = ("drop database if exists %(db)s; "
"create database %(db)s;") % {'db': database}
cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s "
"-e \"%(sql)s\"") % {'user': user, 'password': password,
'host': host, 'sql': sql}
self.execute_cmd(cmd)
elif conn_string.startswith('postgresql'):
self._reset_pg(conn_pieces)
class WalkVersionsMixin(object):
def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):
# Determine latest version script from the repo, then
# upgrade from 1 through to the latest, with no data
# in the databases. This just checks that the schema itself
# upgrades successfully.
# Place the database under version control
self.migration_api.version_control(engine, self.REPOSITORY,
self.INIT_VERSION)
self.assertEqual(self.INIT_VERSION,
self.migration_api.db_version(engine,
self.REPOSITORY))
LOG.debug('latest version is %s' % self.REPOSITORY.latest)
versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1)
for version in versions:
# upgrade -> downgrade -> upgrade
self._migrate_up(engine, version, with_data=True)
if snake_walk:
downgraded = self._migrate_down(
engine, version - 1, with_data=True)
if downgraded:
self._migrate_up(engine, version)
if downgrade:
# Now walk it back down to 0 from the latest, testing
# the downgrade paths.
for version in reversed(versions):
# downgrade -> upgrade -> downgrade
downgraded = self._migrate_down(engine, version - 1)
if snake_walk and downgraded:
self._migrate_up(engine, version)
self._migrate_down(engine, version - 1)
def _migrate_down(self, engine, version, with_data=False):
try:
self.migration_api.downgrade(engine, self.REPOSITORY, version)
except NotImplementedError:
# NOTE(sirp): some migrations, namely release-level
# migrations, don't support a downgrade.
return False
self.assertEqual(
version, self.migration_api.db_version(engine, self.REPOSITORY))
# NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target'
# version). So if we have any downgrade checks, they need to be run for
# the previous (higher numbered) migration.
if with_data:
post_downgrade = getattr(
self, "_post_downgrade_%03d" % (version + 1), None)
if post_downgrade:
post_downgrade(engine)
return True
def _migrate_up(self, engine, version, with_data=False):
"""migrate up to a new version of the db.
We allow for data insertion and post checks at every
migration version with special _pre_upgrade_### and
_check_### functions in the main test.
"""
# NOTE(sdague): try block is here because it's impossible to debug
# where a failed data migration happens otherwise
try:
if with_data:
data = None
pre_upgrade = getattr(
self, "_pre_upgrade_%03d" % version, None)
if pre_upgrade:
data = pre_upgrade(engine)
self.migration_api.upgrade(engine, self.REPOSITORY, version)
self.assertEqual(version,
self.migration_api.db_version(engine,
self.REPOSITORY))
if with_data:
check = getattr(self, "_check_%03d" % version, None)
if check:
check(engine, data)
except Exception:
LOG.error("Failed to migrate to version %s on engine %s" %
(version, engine))
raise

View File

@ -18,6 +18,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from migrate.changeset import UniqueConstraint
import sqlalchemy
from sqlalchemy import Boolean
@ -38,13 +40,21 @@ from sqlalchemy.types import NullType
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import exception
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import timeutils
LOG = logging.getLogger(__name__)
_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+")
def sanitize_db_url(url):
match = _DBURL_REGEX.match(url)
if match:
return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):])
return url
class InvalidSortKey(Exception):
message = _("Sort key supplied was not valid.")
@ -175,6 +185,10 @@ def visit_insert_from_select(element, compiler, **kw):
compiler.process(element.select))
class ColumnError(Exception):
"""Error raised when no column or an invalid column is found."""
def _get_not_supported_column(col_name_col_instance, column_name):
try:
column = col_name_col_instance[column_name]
@ -182,13 +196,13 @@ def _get_not_supported_column(col_name_col_instance, column_name):
msg = _("Please specify column %s in col_name_col_instance "
"param. It is required because column has unsupported "
"type by sqlite).")
raise exception.OpenstackException(message=msg % column_name)
raise ColumnError(msg % column_name)
if not isinstance(column, Column):
msg = _("col_name_col_instance param has wrong type of "
"column instance for column %s It should be instance "
"of sqlalchemy.Column.")
raise exception.OpenstackException(message=msg % column_name)
raise ColumnError(msg % column_name)
return column
@ -286,8 +300,7 @@ def _get_default_deleted_value(table):
return 0
if isinstance(table.c.id.type, String):
return ""
raise exception.OpenstackException(
message=_("Unsupported id columns type"))
raise ColumnError(_("Unsupported id columns type"))
def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
@ -357,7 +370,7 @@ def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
constraints = [constraint.copy() for constraint in table.constraints]
meta = MetaData(bind=migrate_engine)
meta = table.metadata
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()

View File

@ -0,0 +1,21 @@
# Copyright 2013 Red Hat, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warnings
msg = ("Modules in this package are deprecated "
"and will be removed in future releases")
warnings.warn(msg, DeprecationWarning)

View File

@ -0,0 +1,736 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Utility methods for working with WSGI servers."""
import eventlet
eventlet.patcher.monkey_patch(all=False, socket=True)
import datetime
import errno
import socket
import time
import eventlet.wsgi
from oslo.config import cfg
import routes
import routes.middleware
import six
import webob.dec
import webob.exc
from xml.dom import minidom
from xml.parsers import expat
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import service
from ceilometer.openstack.common import sslutils
from ceilometer.openstack.common import xmlutils
socket_opts = [
cfg.IntOpt('backlog',
default=4096,
help="Number of backlog requests to configure the socket with"),
cfg.IntOpt('tcp_keepidle',
default=600,
help="Sets the value of TCP_KEEPIDLE in seconds for each "
"server socket. Not supported on OS X."),
]
CONF = cfg.CONF
CONF.register_opts(socket_opts)
LOG = logging.getLogger(__name__)
class MalformedRequestBody(Exception):
def __init__(self, reason):
super(MalformedRequestBody, self).__init__(
"Malformed message body: %s", reason)
class InvalidContentType(Exception):
def __init__(self, content_type):
super(InvalidContentType, self).__init__(
"Invalid content type %s", content_type)
def run_server(application, port, **kwargs):
"""Run a WSGI server with the given application."""
sock = eventlet.listen(('0.0.0.0', port))
eventlet.wsgi.server(sock, application, **kwargs)
class Service(service.Service):
"""Provides a Service API for wsgi servers.
This gives us the ability to launch wsgi servers with the
Launcher classes in service.py.
"""
def __init__(self, application, port,
host='0.0.0.0', backlog=4096, threads=1000):
self.application = application
self._port = port
self._host = host
self._backlog = backlog if backlog else CONF.backlog
self._socket = self._get_socket(host, port, self._backlog)
super(Service, self).__init__(threads)
def _get_socket(self, host, port, backlog):
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
# future or monitor upstream for a fix
info = socket.getaddrinfo(host,
port,
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
family = info[0]
bind_addr = info[-1]
sock = None
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
try:
sock = eventlet.listen(bind_addr,
backlog=backlog,
family=family)
if sslutils.is_enabled():
sock = sslutils.wrap(sock)
except socket.error as err:
if err.args[0] != errno.EADDRINUSE:
raise
eventlet.sleep(0.1)
if not sock:
raise RuntimeError(_("Could not bind to %(host)s:%(port)s "
"after trying for 30 seconds") %
{'host': host, 'port': port})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
return sock
def start(self):
"""Start serving this service using the provided server instance.
:returns: None
"""
super(Service, self).start()
self.tg.add_thread(self._run, self.application, self._socket)
@property
def backlog(self):
return self._backlog
@property
def host(self):
return self._socket.getsockname()[0] if self._socket else self._host
@property
def port(self):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
"""Stop serving this API.
:returns: None
"""
super(Service, self).stop()
def _run(self, application, socket):
"""Start a WSGI server in a new green thread."""
logger = logging.getLogger('eventlet.wsgi')
eventlet.wsgi.server(socket,
application,
custom_pool=self.tg.pool,
log=logging.WritableLogger(logger))
class Router(object):
"""WSGI middleware that maps incoming requests to WSGI apps."""
def __init__(self, mapper):
"""Create a router for the given routes.Mapper.
Each route in `mapper` must specify a 'controller', which is a
WSGI app to call. You'll probably want to specify an 'action' as
well and have your controller be a wsgi.Controller, who will route
the request to the action method.
Examples:
mapper = routes.Mapper()
sc = ServerController()
# Explicit mapping of one route to a controller+action
mapper.connect(None, "/svrlist", controller=sc, action="list")
# Actions are all implicitly defined
mapper.resource("server", "servers", controller=sc)
# Pointing to an arbitrary WSGI app. You can specify the
# {path_info:.*} parameter so the target app can be handed just that
# section of the URL.
mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp())
"""
self.map = mapper
self._router = routes.middleware.RoutesMiddleware(self._dispatch,
self.map)
@webob.dec.wsgify
def __call__(self, req):
"""Route the incoming request to a controller based on self.map.
If no match, return a 404.
"""
return self._router
@staticmethod
@webob.dec.wsgify
def _dispatch(req):
"""Gets application from the environment.
Called by self._router after matching the incoming request to a route
and putting the information into req.environ. Either returns 404
or the routed WSGI app's response.
"""
match = req.environ['wsgiorg.routing_args'][1]
if not match:
return webob.exc.HTTPNotFound()
app = match['controller']
return app
class Request(webob.Request):
"""Add some Openstack API-specific logic to the base webob.Request."""
default_request_content_types = ('application/json', 'application/xml')
default_accept_types = ('application/json', 'application/xml')
default_accept_type = 'application/json'
def best_match_content_type(self, supported_content_types=None):
"""Determine the requested response content-type.
Based on the query extension then the Accept header.
Defaults to default_accept_type if we don't find a preference
"""
supported_content_types = (supported_content_types or
self.default_accept_types)
parts = self.path.rsplit('.', 1)
if len(parts) > 1:
ctype = 'application/{0}'.format(parts[1])
if ctype in supported_content_types:
return ctype
bm = self.accept.best_match(supported_content_types)
return bm or self.default_accept_type
def get_content_type(self, allowed_content_types=None):
"""Determine content type of the request body.
Does not do any body introspection, only checks header
"""
if "Content-Type" not in self.headers:
return None
content_type = self.content_type
allowed_content_types = (allowed_content_types or
self.default_request_content_types)
if content_type not in allowed_content_types:
raise InvalidContentType(content_type=content_type)
return content_type
class Resource(object):
"""WSGI app that handles (de)serialization and controller dispatch.
Reads routing information supplied by RoutesMiddleware and calls
the requested action method upon its deserializer, controller,
and serializer. Those three objects may implement any of the basic
controller action methods (create, update, show, index, delete)
along with any that may be specified in the api router. A 'default'
method may also be implemented to be used in place of any
non-implemented actions. Deserializer methods must accept a request
argument and return a dictionary. Controller methods must accept a
request argument. Additionally, they must also accept keyword
arguments that represent the keys returned by the Deserializer. They
may raise a webob.exc exception or return a dict, which will be
serialized by requested content type.
"""
def __init__(self, controller, deserializer=None, serializer=None):
"""Initiates Resource object.
:param controller: object that implement methods created by routes lib
:param deserializer: object that supports webob request deserialization
through controller-like actions
:param serializer: object that supports webob response serialization
through controller-like actions
"""
self.controller = controller
self.serializer = serializer or ResponseSerializer()
self.deserializer = deserializer or RequestDeserializer()
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, request):
"""WSGI method that controls (de)serialization and method dispatch."""
try:
action, action_args, accept = self.deserialize_request(request)
except InvalidContentType:
msg = _("Unsupported Content-Type")
return webob.exc.HTTPUnsupportedMediaType(explanation=msg)
except MalformedRequestBody:
msg = _("Malformed request body")
return webob.exc.HTTPBadRequest(explanation=msg)
action_result = self.execute_action(action, request, **action_args)
try:
return self.serialize_response(action, action_result, accept)
# return unserializable result (typically a webob exc)
except Exception:
return action_result
def deserialize_request(self, request):
return self.deserializer.deserialize(request)
def serialize_response(self, action, action_result, accept):
return self.serializer.serialize(action_result, accept, action)
def execute_action(self, action, request, **action_args):
return self.dispatch(self.controller, action, request, **action_args)
def dispatch(self, obj, action, *args, **kwargs):
"""Find action-specific method on self and call it."""
try:
method = getattr(obj, action)
except AttributeError:
method = getattr(obj, 'default')
return method(*args, **kwargs)
def get_action_args(self, request_environment):
"""Parse dictionary created by routes library."""
try:
args = request_environment['wsgiorg.routing_args'][1].copy()
except Exception:
return {}
try:
del args['controller']
except KeyError:
pass
try:
del args['format']
except KeyError:
pass
return args
class ActionDispatcher(object):
"""Maps method name to local methods through action name."""
def dispatch(self, *args, **kwargs):
"""Find and call local method."""
action = kwargs.pop('action', 'default')
action_method = getattr(self, str(action), self.default)
return action_method(*args, **kwargs)
def default(self, data):
raise NotImplementedError()
class DictSerializer(ActionDispatcher):
"""Default request body serialization."""
def serialize(self, data, action='default'):
return self.dispatch(data, action=action)
def default(self, data):
return ""
class JSONDictSerializer(DictSerializer):
"""Default JSON request body serialization."""
def default(self, data):
def sanitizer(obj):
if isinstance(obj, datetime.datetime):
_dtime = obj - datetime.timedelta(microseconds=obj.microsecond)
return _dtime.isoformat()
return six.text_type(obj)
return jsonutils.dumps(data, default=sanitizer)
class XMLDictSerializer(DictSerializer):
def __init__(self, metadata=None, xmlns=None):
"""Initiates XMLDictSerializer object.
:param metadata: information needed to deserialize xml into
a dictionary.
:param xmlns: XML namespace to include with serialized xml
"""
super(XMLDictSerializer, self).__init__()
self.metadata = metadata or {}
self.xmlns = xmlns
def default(self, data):
# We expect data to contain a single key which is the XML root.
root_key = data.keys()[0]
doc = minidom.Document()
node = self._to_xml_node(doc, self.metadata, root_key, data[root_key])
return self.to_xml_string(node)
def to_xml_string(self, node, has_atom=False):
self._add_xmlns(node, has_atom)
return node.toprettyxml(indent=' ', encoding='UTF-8')
#NOTE (ameade): the has_atom should be removed after all of the
# xml serializers and view builders have been updated to the current
# spec that required all responses include the xmlns:atom, the has_atom
# flag is to prevent current tests from breaking
def _add_xmlns(self, node, has_atom=False):
if self.xmlns is not None:
node.setAttribute('xmlns', self.xmlns)
if has_atom:
node.setAttribute('xmlns:atom', "http://www.w3.org/2005/Atom")
def _to_xml_node(self, doc, metadata, nodename, data):
"""Recursive method to convert data members to XML nodes."""
result = doc.createElement(nodename)
# Set the xml namespace if one is specified
# TODO(justinsb): We could also use prefixes on the keys
xmlns = metadata.get('xmlns', None)
if xmlns:
result.setAttribute('xmlns', xmlns)
#TODO(bcwaldon): accomplish this without a type-check
if type(data) is list:
collections = metadata.get('list_collections', {})
if nodename in collections:
metadata = collections[nodename]
for item in data:
node = doc.createElement(metadata['item_name'])
node.setAttribute(metadata['item_key'], str(item))
result.appendChild(node)
return result
singular = metadata.get('plurals', {}).get(nodename, None)
if singular is None:
if nodename.endswith('s'):
singular = nodename[:-1]
else:
singular = 'item'
for item in data:
node = self._to_xml_node(doc, metadata, singular, item)
result.appendChild(node)
#TODO(bcwaldon): accomplish this without a type-check
elif type(data) is dict:
collections = metadata.get('dict_collections', {})
if nodename in collections:
metadata = collections[nodename]
for k, v in data.items():
node = doc.createElement(metadata['item_name'])
node.setAttribute(metadata['item_key'], str(k))
text = doc.createTextNode(str(v))
node.appendChild(text)
result.appendChild(node)
return result
attrs = metadata.get('attributes', {}).get(nodename, {})
for k, v in data.items():
if k in attrs:
result.setAttribute(k, str(v))
else:
node = self._to_xml_node(doc, metadata, k, v)
result.appendChild(node)
else:
# Type is atom
node = doc.createTextNode(str(data))
result.appendChild(node)
return result
def _create_link_nodes(self, xml_doc, links):
link_nodes = []
for link in links:
link_node = xml_doc.createElement('atom:link')
link_node.setAttribute('rel', link['rel'])
link_node.setAttribute('href', link['href'])
if 'type' in link:
link_node.setAttribute('type', link['type'])
link_nodes.append(link_node)
return link_nodes
class ResponseHeadersSerializer(ActionDispatcher):
"""Default response headers serialization."""
def serialize(self, response, data, action):
self.dispatch(response, data, action=action)
def default(self, response, data):
response.status_int = 200
class ResponseSerializer(object):
"""Encode the necessary pieces into a response object."""
def __init__(self, body_serializers=None, headers_serializer=None):
self.body_serializers = {
'application/xml': XMLDictSerializer(),
'application/json': JSONDictSerializer(),
}
self.body_serializers.update(body_serializers or {})
self.headers_serializer = (headers_serializer or
ResponseHeadersSerializer())
def serialize(self, response_data, content_type, action='default'):
"""Serialize a dict into a string and wrap in a wsgi.Request object.
:param response_data: dict produced by the Controller
:param content_type: expected mimetype of serialized response body
"""
response = webob.Response()
self.serialize_headers(response, response_data, action)
self.serialize_body(response, response_data, content_type, action)
return response
def serialize_headers(self, response, data, action):
self.headers_serializer.serialize(response, data, action)
def serialize_body(self, response, data, content_type, action):
response.headers['Content-Type'] = content_type
if data is not None:
serializer = self.get_body_serializer(content_type)
response.body = serializer.serialize(data, action)
def get_body_serializer(self, content_type):
try:
return self.body_serializers[content_type]
except (KeyError, TypeError):
raise InvalidContentType(content_type=content_type)
class RequestHeadersDeserializer(ActionDispatcher):
"""Default request headers deserializer"""
def deserialize(self, request, action):
return self.dispatch(request, action=action)
def default(self, request):
return {}
class RequestDeserializer(object):
"""Break up a Request object into more useful pieces."""
def __init__(self, body_deserializers=None, headers_deserializer=None,
supported_content_types=None):
self.supported_content_types = supported_content_types
self.body_deserializers = {
'application/xml': XMLDeserializer(),
'application/json': JSONDeserializer(),
}
self.body_deserializers.update(body_deserializers or {})
self.headers_deserializer = (headers_deserializer or
RequestHeadersDeserializer())
def deserialize(self, request):
"""Extract necessary pieces of the request.
:param request: Request object
:returns: tuple of (expected controller action name, dictionary of
keyword arguments to pass to the controller, the expected
content type of the response)
"""
action_args = self.get_action_args(request.environ)
action = action_args.pop('action', None)
action_args.update(self.deserialize_headers(request, action))
action_args.update(self.deserialize_body(request, action))
accept = self.get_expected_content_type(request)
return (action, action_args, accept)
def deserialize_headers(self, request, action):
return self.headers_deserializer.deserialize(request, action)
def deserialize_body(self, request, action):
if not request.body:
LOG.debug(_("Empty body provided in request"))
return {}
try:
content_type = request.get_content_type()
except InvalidContentType:
LOG.debug(_("Unrecognized Content-Type provided in request"))
raise
if content_type is None:
LOG.debug(_("No Content-Type provided in request"))
return {}
try:
deserializer = self.get_body_deserializer(content_type)
except InvalidContentType:
LOG.debug(_("Unable to deserialize body as provided Content-Type"))
raise
return deserializer.deserialize(request.body, action)
def get_body_deserializer(self, content_type):
try:
return self.body_deserializers[content_type]
except (KeyError, TypeError):
raise InvalidContentType(content_type=content_type)
def get_expected_content_type(self, request):
return request.best_match_content_type(self.supported_content_types)
def get_action_args(self, request_environment):
"""Parse dictionary created by routes library."""
try:
args = request_environment['wsgiorg.routing_args'][1].copy()
except Exception:
return {}
try:
del args['controller']
except KeyError:
pass
try:
del args['format']
except KeyError:
pass
return args
class TextDeserializer(ActionDispatcher):
"""Default request body deserialization."""
def deserialize(self, datastring, action='default'):
return self.dispatch(datastring, action=action)
def default(self, datastring):
return {}
class JSONDeserializer(TextDeserializer):
def _from_json(self, datastring):
try:
return jsonutils.loads(datastring)
except ValueError:
msg = _("cannot understand JSON")
raise MalformedRequestBody(reason=msg)
def default(self, datastring):
return {'body': self._from_json(datastring)}
class XMLDeserializer(TextDeserializer):
def __init__(self, metadata=None):
"""Initiates XMLDeserializer object.
:param metadata: information needed to deserialize xml into
a dictionary.
"""
super(XMLDeserializer, self).__init__()
self.metadata = metadata or {}
def _from_xml(self, datastring):
plurals = set(self.metadata.get('plurals', {}))
try:
node = xmlutils.safe_minidom_parse_string(datastring).childNodes[0]
return {node.nodeName: self._from_xml_node(node, plurals)}
except expat.ExpatError:
msg = _("cannot understand XML")
raise MalformedRequestBody(reason=msg)
def _from_xml_node(self, node, listnames):
"""Convert a minidom node to a simple Python type.
:param listnames: list of XML node names whose subnodes should
be considered list items.
"""
if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
return node.childNodes[0].nodeValue
elif node.nodeName in listnames:
return [self._from_xml_node(n, listnames) for n in node.childNodes]
else:
result = dict()
for attr in node.attributes.keys():
result[attr] = node.attributes[attr].nodeValue
for child in node.childNodes:
if child.nodeType != node.TEXT_NODE:
result[child.nodeName] = self._from_xml_node(child,
listnames)
return result
def find_first_child_named(self, parent, name):
"""Search a nodes children for the first child with a given name."""
for node in parent.childNodes:
if node.nodeName == name:
return node
return None
def find_children_named(self, parent, name):
"""Return all of a nodes children who have the given name."""
for node in parent.childNodes:
if node.nodeName == name:
yield node
def extract_text(self, node):
"""Get the text field contained by the given node."""
if len(node.childNodes) == 1:
child = node.childNodes[0]
if child.nodeType == child.TEXT_NODE:
return child.nodeValue
return ""
def default(self, datastring):
return {'body': self._from_xml(datastring)}

View File

@ -24,6 +24,8 @@ import sys
import time
import traceback
import six
from ceilometer.openstack.common.gettextutils import _ # noqa
@ -65,7 +67,7 @@ class save_and_reraise_exception(object):
self.tb))
return False
if self.reraise:
raise self.type_, self.value, self.tb
six.reraise(self.type_, self.value, self.tb)
def forever_retry_uncaught_exceptions(infunc):
@ -77,7 +79,8 @@ def forever_retry_uncaught_exceptions(infunc):
try:
return infunc(*args, **kwargs)
except Exception as exc:
if exc.message == last_exc_message:
this_exc_message = six.u(str(exc))
if this_exc_message == last_exc_message:
exc_count += 1
else:
exc_count = 1
@ -85,12 +88,12 @@ def forever_retry_uncaught_exceptions(infunc):
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
exc.message != last_exc_message):
this_exc_message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = exc.message
last_exc_message = this_exc_message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.

View File

@ -19,6 +19,7 @@
import contextlib
import errno
import os
import tempfile
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _ # noqa
@ -69,33 +70,34 @@ def read_cached_file(filename, force_reload=False):
return (reloaded, cache_info['data'])
def delete_if_exists(path):
def delete_if_exists(path, remove=os.unlink):
"""Delete a file, but ignore file not found error.
:param path: File to delete
:param remove: Optional function to remove passed path
"""
try:
os.unlink(path)
remove(path)
except OSError as e:
if e.errno == errno.ENOENT:
return
else:
if e.errno != errno.ENOENT:
raise
@contextlib.contextmanager
def remove_path_on_error(path):
def remove_path_on_error(path, remove=delete_if_exists):
"""Protect code that wants to operate on PATH atomically.
Any exception will cause PATH to be removed.
:param path: File to work with
:param remove: Optional function to remove passed path
"""
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
delete_if_exists(path)
remove(path)
def file_open(*args, **kwargs):
@ -108,3 +110,30 @@ def file_open(*args, **kwargs):
state at all (for unit tests)
"""
return file(*args, **kwargs)
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
"""Create temporary file or use existing file.
This util is needed for creating temporary file with
specified content, suffix and prefix. If path is not None,
it will be used for writing content. If the path doesn't
exist it'll be created.
:param content: content for temporary file.
:param path: same as parameter 'dir' for mkstemp
:param suffix: same as parameter 'suffix' for mkstemp
:param prefix: same as parameter 'prefix' for mkstemp
For example: it can be used in database tests for creating
configuration files.
"""
if path:
ensure_tree(path)
(fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
try:
os.write(fd, content)
finally:
os.close(fd)
return path

View File

@ -17,6 +17,7 @@
# under the License.
import fixtures
from oslo.config import cfg
import six
class Config(fixtures.Fixture):
@ -41,5 +42,5 @@ class Config(fixtures.Fixture):
def config(self, **kw):
group = kw.pop('group', None)
for k, v in kw.iteritems():
for k, v in six.iteritems(kw):
self.conf.set_override(k, v, group)

View File

@ -26,10 +26,13 @@ Usual usage in an openstack.common module:
import copy
import gettext
import logging.handlers
import logging
import os
import re
import UserString
try:
import UserString as _userString
except ImportError:
import collections as _userString
from babel import localedata
import six
@ -37,7 +40,7 @@ import six
_localedir = os.environ.get('ceilometer'.upper() + '_LOCALEDIR')
_t = gettext.translation('ceilometer', localedir=_localedir, fallback=True)
_AVAILABLE_LANGUAGES = []
_AVAILABLE_LANGUAGES = {}
USE_LAZY = False
@ -57,6 +60,8 @@ def _(msg):
if USE_LAZY:
return Message(msg, 'ceilometer')
else:
if six.PY3:
return _t.gettext(msg)
return _t.ugettext(msg)
@ -102,24 +107,28 @@ def install(domain, lazy=False):
"""
return Message(msg, domain)
import __builtin__
__builtin__.__dict__['_'] = _lazy_gettext
from six import moves
moves.builtins.__dict__['_'] = _lazy_gettext
else:
localedir = '%s_LOCALEDIR' % domain.upper()
gettext.install(domain,
localedir=os.environ.get(localedir),
unicode=True)
if six.PY3:
gettext.install(domain,
localedir=os.environ.get(localedir))
else:
gettext.install(domain,
localedir=os.environ.get(localedir),
unicode=True)
class Message(UserString.UserString, object):
class Message(_userString.UserString, object):
"""Class used to encapsulate translatable messages."""
def __init__(self, msg, domain):
# _msg is the gettext msgid and should never change
self._msg = msg
self._left_extra_msg = ''
self._right_extra_msg = ''
self._locale = None
self.params = None
self.locale = None
self.domain = domain
@property
@ -139,8 +148,13 @@ class Message(UserString.UserString, object):
localedir=localedir,
fallback=True)
if six.PY3:
ugettext = lang.gettext
else:
ugettext = lang.ugettext
full_msg = (self._left_extra_msg +
lang.ugettext(self._msg) +
ugettext(self._msg) +
self._right_extra_msg)
if self.params is not None:
@ -148,6 +162,33 @@ class Message(UserString.UserString, object):
return six.text_type(full_msg)
@property
def locale(self):
return self._locale
@locale.setter
def locale(self, value):
self._locale = value
if not self.params:
return
# This Message object may have been constructed with one or more
# Message objects as substitution parameters, given as a single
# Message, or a tuple or Map containing some, so when setting the
# locale for this Message we need to set it for those Messages too.
if isinstance(self.params, Message):
self.params.locale = value
return
if isinstance(self.params, tuple):
for param in self.params:
if isinstance(param, Message):
param.locale = value
return
if isinstance(self.params, dict):
for param in self.params.values():
if isinstance(param, Message):
param.locale = value
def _save_dictionary_parameter(self, dict_param):
full_msg = self.data
# look for %(blah) fields in string;
@ -166,7 +207,7 @@ class Message(UserString.UserString, object):
params[key] = copy.deepcopy(dict_param[key])
except TypeError:
# cast uncopyable thing to unicode string
params[key] = unicode(dict_param[key])
params[key] = six.text_type(dict_param[key])
return params
@ -185,7 +226,7 @@ class Message(UserString.UserString, object):
try:
self.params = copy.deepcopy(other)
except TypeError:
self.params = unicode(other)
self.params = six.text_type(other)
return self
@ -194,11 +235,13 @@ class Message(UserString.UserString, object):
return self.data
def __str__(self):
if six.PY3:
return self.__unicode__()
return self.data.encode('utf-8')
def __getstate__(self):
to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg',
'domain', 'params', 'locale']
'domain', 'params', '_locale']
new_dict = self.__dict__.fromkeys(to_copy)
for attr in to_copy:
new_dict[attr] = copy.deepcopy(self.__dict__[attr])
@ -252,7 +295,7 @@ class Message(UserString.UserString, object):
if name in ops:
return getattr(self.data, name)
else:
return UserString.UserString.__getattribute__(self, name)
return _userString.UserString.__getattribute__(self, name)
def get_available_languages(domain):
@ -260,8 +303,8 @@ def get_available_languages(domain):
:param domain: the domain to get languages for
"""
if _AVAILABLE_LANGUAGES:
return _AVAILABLE_LANGUAGES
if domain in _AVAILABLE_LANGUAGES:
return copy.copy(_AVAILABLE_LANGUAGES[domain])
localedir = '%s_LOCALEDIR' % domain.upper()
find = lambda x: gettext.find(domain,
@ -270,7 +313,7 @@ def get_available_languages(domain):
# NOTE(mrodden): en_US should always be available (and first in case
# order matters) since our in-line message strings are en_US
_AVAILABLE_LANGUAGES.append('en_US')
language_list = ['en_US']
# NOTE(luisg): Babel <1.0 used a function called list(), which was
# renamed to locale_identifiers() in >=1.0, the requirements master list
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
@ -280,16 +323,17 @@ def get_available_languages(domain):
locale_identifiers = list_identifiers()
for i in locale_identifiers:
if find(i) is not None:
_AVAILABLE_LANGUAGES.append(i)
return _AVAILABLE_LANGUAGES
language_list.append(i)
_AVAILABLE_LANGUAGES[domain] = language_list
return copy.copy(language_list)
def get_localized_message(message, user_locale):
"""Gets a localized version of the given message in the given locale."""
if (isinstance(message, Message)):
if isinstance(message, Message):
if user_locale:
message.locale = user_locale
return unicode(message)
return six.text_type(message)
else:
return message

View File

@ -38,14 +38,19 @@ import functools
import inspect
import itertools
import json
import types
import xmlrpclib
try:
import xmlrpclib
except ImportError:
# NOTE(jd): xmlrpclib is not shipped with Python 3
xmlrpclib = None
import netaddr
import six
from ceilometer.openstack.common import gettextutils
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import timeutils
netaddr = importutils.try_import("netaddr")
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.isfunction, inspect.isgeneratorfunction,
@ -53,7 +58,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
inspect.isabstract]
_simple_types = (types.NoneType, int, basestring, bool, float, long)
_simple_types = (six.string_types + six.integer_types
+ (type(None), bool, float))
def to_primitive(value, convert_instances=False, convert_datetime=True,
@ -125,11 +131,13 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
if isinstance(value, xmlrpclib.DateTime):
if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
if convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif isinstance(value, gettextutils.Message):
return value.data
elif hasattr(value, 'iteritems'):
return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
@ -138,7 +146,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
return recursive(value.__dict__, level=level + 1)
elif isinstance(value, netaddr.IPAddress):
elif netaddr and isinstance(value, netaddr.IPAddress):
return six.text_type(value)
else:
if any(test(value) for test in _nasty_type_tests):

View File

@ -15,16 +15,15 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Greenthread local storage of variables using weak references"""
"""Local storage of variables using weak references"""
import threading
import weakref
from eventlet import corolocal
class WeakLocal(corolocal.local):
class WeakLocal(threading.local):
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
rval = super(WeakLocal, self).__getattribute__(attr)
if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
@ -34,7 +33,7 @@ class WeakLocal(corolocal.local):
def __setattr__(self, attr, value):
value = weakref.ref(value)
return corolocal.local.__setattr__(self, attr, value)
return super(WeakLocal, self).__setattr__(attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
@ -45,4 +44,4 @@ store = WeakLocal()
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
strong_store = corolocal.local
strong_store = threading.local()

View File

@ -20,10 +20,11 @@ import contextlib
import errno
import functools
import os
import threading
import time
import weakref
from eventlet import semaphore
import fixtures
from oslo.config import cfg
from ceilometer.openstack.common import fileutils
@ -137,7 +138,8 @@ _semaphores = weakref.WeakValueDictionary()
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `semaphore.Semaphore` instance unless external is
This function yields a `threading.Semaphore` instance (if we don't use
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
@ -155,7 +157,7 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
sem = _semaphores.get(name, threading.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
@ -274,3 +276,36 @@ def synchronized_with_prefix(lock_file_prefix):
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
class LockFixture(fixtures.Fixture):
"""External locking fixture.
This fixture is basically an alternative to the synchronized decorator with
the external flag so that tearDowns and addCleanups will be included in
the lock context for locking between tests. The fixture is recommended to
be the first line in a test method, like so::
def test_method(self):
self.useFixture(LockFixture)
...
or the first line in setUp if all the test methods in the class are
required to be serialized. Something like::
class TestCase(testtools.testcase):
def setUp(self):
self.useFixture(LockFixture)
super(TestCase, self).setUp()
...
This is because addCleanups are put on a LIFO queue that gets run after the
test method exits. (either by completing or raising an exception)
"""
def __init__(self, name, lock_file_prefix=None):
self.mgr = lock(name, lock_file_prefix, True)
def setUp(self):
super(LockFixture, self).setUp()
self.addCleanup(self.mgr.__exit__, None, None, None)
self.mgr.__enter__()

View File

@ -39,6 +39,7 @@ import sys
import traceback
from oslo.config import cfg
import six
from six import moves
from ceilometer.openstack.common.gettextutils import _ # noqa
@ -207,6 +208,8 @@ def _get_log_file_path(binary=None):
binary = binary or _get_binary_name()
return '%s.log' % (os.path.join(logdir, binary),)
return None
class BaseLoggerAdapter(logging.LoggerAdapter):
@ -249,6 +252,13 @@ class ContextAdapter(BaseLoggerAdapter):
self.warn(stdmsg, *args, **kwargs)
def process(self, msg, kwargs):
# NOTE(mrodden): catch any Message/other object and
# coerce to unicode before they can get
# to the python logging and possibly
# cause string encoding trouble
if not isinstance(msg, six.string_types):
msg = six.text_type(msg)
if 'extra' not in kwargs:
kwargs['extra'] = {}
extra = kwargs['extra']
@ -260,14 +270,14 @@ class ContextAdapter(BaseLoggerAdapter):
extra.update(_dictify_context(context))
instance = kwargs.pop('instance', None)
instance_uuid = (extra.get('instance_uuid', None) or
kwargs.pop('instance_uuid', None))
instance_extra = ''
if instance:
instance_extra = CONF.instance_format % instance
else:
instance_uuid = kwargs.pop('instance_uuid', None)
if instance_uuid:
instance_extra = (CONF.instance_uuid_format
% {'uuid': instance_uuid})
elif instance_uuid:
instance_extra = (CONF.instance_uuid_format
% {'uuid': instance_uuid})
extra.update({'instance': instance_extra})
extra.update({"project": self.project})

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack LLC.
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -57,6 +57,8 @@ class RequestNotifier(base.Middleware):
def __init__(self, app, **conf):
self.service_name = conf.get('service_name', None)
self.ignore_req_list = [x.upper().strip() for x in
conf.get('ignore_req_list', '').split(',')]
super(RequestNotifier, self).__init__(app)
@staticmethod
@ -109,13 +111,16 @@ class RequestNotifier(base.Middleware):
@webob.dec.wsgify
def __call__(self, req):
self.process_request(req)
try:
response = req.get_response(self.application)
except Exception:
type, value, traceback = sys.exc_info()
self.process_response(req, None, value, traceback)
raise
if req.method in self.ignore_req_list:
return req.get_response(self.application)
else:
self.process_response(req, response)
return response
self.process_request(req)
try:
response = req.get_response(self.application)
except Exception:
type, value, traceback = sys.exc_info()
self.process_response(req, None, value, traceback)
raise
else:
self.process_response(req, response)
return response

View File

@ -25,7 +25,7 @@ CONF = cfg.CONF
def notify(_context, message):
"""Notifies the recipient of the desired event given the model.
Log notifications using openstack's default logging system.
Log notifications using OpenStack's default logging system.
"""
priority = message.get('priority',

View File

@ -24,7 +24,7 @@ LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'notification_topics', default=['notifications', ],
help='AMQP topic used for openstack notifications')
help='AMQP topic used for OpenStack notifications')
CONF = cfg.CONF
CONF.register_opt(notification_topic_opt)
@ -43,4 +43,5 @@ def notify(context, message):
rpc.notify(context, topic, message)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())
"Payload=%(message)s"),
{"topic": topic, "message": message})

View File

@ -26,7 +26,7 @@ LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'topics', default=['notifications', ],
help='AMQP topic(s) used for openstack notifications')
help='AMQP topic(s) used for OpenStack notifications')
opt_group = cfg.OptGroup(name='rpc_notifier2',
title='Options for rpc_notifier2')
@ -49,4 +49,5 @@ def notify(context, message):
rpc.notify(context, topic, message, envelope=True)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())
"Payload=%(message)s"),
{"topic": topic, "message": message})

View File

@ -221,7 +221,7 @@ class Enforcer(object):
if policy_file:
return policy_file
raise cfg.ConfigFilesNotFoundError((CONF.policy_file,))
raise cfg.ConfigFilesNotFoundError((self.policy_file,))
def enforce(self, rule, target, creds, do_raise=False,
exc=None, *args, **kwargs):
@ -279,11 +279,10 @@ class Enforcer(object):
return result
@six.add_metaclass(abc.ABCMeta)
class BaseCheck(object):
"""Abstract base class for Check classes."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __str__(self):
"""String representation of the Check tree rooted at this node."""
@ -449,7 +448,6 @@ class OrCheck(BaseCheck):
for rule in self.rules:
if rule(target, cred, enforcer):
return True
return False
def add_check(self, rule):
@ -627,6 +625,7 @@ def reducer(*tokens):
return decorator
@six.add_metaclass(ParseStateMeta)
class ParseState(object):
"""Implement the core of parsing the policy language.
@ -639,8 +638,6 @@ class ParseState(object):
shouldn't be that big a problem.
"""
__metaclass__ = ParseStateMeta
def __init__(self):
"""Initialize the ParseState."""
@ -846,7 +843,13 @@ class GenericCheck(Check):
"""
# TODO(termie): do dict inspection via dot syntax
match = self.match % target
try:
match = self.match % target
except KeyError:
# While doing GenericCheck if key not
# present in Target return false
return False
if self.kind in creds:
return match == six.text_type(creds[self.kind])
return False

View File

@ -56,8 +56,7 @@ rpc_opts = [
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
default=['ceilometer.openstack.common.exception',
'nova.exception',
default=['nova.exception',
'cinder.exception',
'exceptions',
],

View File

@ -29,6 +29,7 @@ from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import local
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import versionutils
CONF = cfg.CONF
@ -441,19 +442,15 @@ def client_exceptions(*exceptions):
return outer
# TODO(sirp): we should deprecate this in favor of
# using `versionutils.is_compatible` directly
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
version_parts = version.split('.')
imp_version_parts = imp_version.split('.')
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
return False
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
return False
return True
return versionutils.is_compatible(version, imp_version)
def serialize_msg(raw_msg):

View File

@ -15,11 +15,12 @@
"""Provides the definition of an RPC serialization handler"""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Serializer(object):
"""Generic (de-)serialization definition base class."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def serialize_entity(self, context, entity):

View File

@ -221,6 +221,9 @@ class ProcessLauncher(object):
status = None
signo = 0
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
try:
launcher.wait()
except SignalExit as exc:
@ -273,9 +276,6 @@ class ProcessLauncher(object):
pid = os.fork()
if pid == 0:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()

View File

@ -0,0 +1,53 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010-2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Common utilities used in testing"""
import os
import fixtures
import testtools
class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self._set_timeout()
self._fake_output()
self.useFixture(fixtures.FakeLogger('ceilometer.openstack.common'))
self.useFixture(fixtures.NestedTempfile())
def _set_timeout(self):
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
def _fake_output(self):
if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
os.environ.get('OS_STDOUT_CAPTURE') == '1'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
os.environ.get('OS_STDERR_CAPTURE') == '1'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

View File

@ -21,6 +21,7 @@ Time related utilities and helper functions.
import calendar
import datetime
import time
import iso8601
import six
@ -49,9 +50,9 @@ def parse_isotime(timestr):
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
raise ValueError(unicode(e))
except TypeError as e:
raise ValueError(e.message)
raise ValueError(unicode(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
@ -90,6 +91,11 @@ def is_newer_than(after, seconds):
def utcnow_ts():
"""Timestamp version of our utcnow function."""
if utcnow.override_time is None:
# NOTE(kgriffs): This is several times faster
# than going through calendar.timegm(...)
return int(time.time())
return calendar.timegm(utcnow().timetuple())
@ -111,12 +117,15 @@ def iso8601_from_timestamp(timestamp):
utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
def set_time_override(override_time=None):
"""Overrides utils.utcnow.
Make it return a constant time or a list thereof, one at a time.
:param override_time: datetime instance or list thereof. If not
given, defaults to the current UTC time.
"""
utcnow.override_time = override_time
utcnow.override_time = override_time or datetime.datetime.utcnow()
def advance_time_delta(timedelta):

View File

@ -0,0 +1,45 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Helpers for comparing version strings.
"""
import pkg_resources
def is_compatible(requested_version, current_version, same_major=True):
"""Determine whether `requested_version` is satisfied by
`current_version`; in other words, `current_version` is >=
`requested_version`.
:param requested_version: version to check for compatibility
:param current_version: version to check against
:param same_major: if True, the major version must be identical between
`requested_version` and `current_version`. This is used when a
major-version difference indicates incompatibility between the two
versions. Since this is the common-case in practice, the default is
True.
:returns: True if compatible, False if not
"""
requested_parts = pkg_resources.parse_version(requested_version)
current_parts = pkg_resources.parse_version(current_version)
if same_major and (requested_parts[0] != current_parts[0]):
return False
return current_parts >= requested_parts

View File

@ -0,0 +1,74 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from xml.dom import minidom
from xml.parsers import expat
from xml import sax
from xml.sax import expatreader
class ProtectedExpatParser(expatreader.ExpatParser):
"""An expat parser which disables DTD's and entities by default."""
def __init__(self, forbid_dtd=True, forbid_entities=True,
*args, **kwargs):
# Python 2.x old style class
expatreader.ExpatParser.__init__(self, *args, **kwargs)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise ValueError("Inline DTD forbidden")
def entity_decl(self, entityName, is_parameter_entity, value, base,
systemId, publicId, notationName):
raise ValueError("<!ENTITY> entity declaration forbidden")
def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise ValueError("<!ENTITY> unparsed entity forbidden")
def external_entity_ref(self, context, base, systemId, publicId):
raise ValueError("<!ENTITY> external entity forbidden")
def notation_decl(self, name, base, sysid, pubid):
raise ValueError("<!ENTITY> notation forbidden")
def reset(self):
expatreader.ExpatParser.reset(self)
if self.forbid_dtd:
self._parser.StartDoctypeDeclHandler = self.start_doctype_decl
self._parser.EndDoctypeDeclHandler = None
if self.forbid_entities:
self._parser.EntityDeclHandler = self.entity_decl
self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl
self._parser.ExternalEntityRefHandler = self.external_entity_ref
self._parser.NotationDeclHandler = self.notation_decl
try:
self._parser.SkippedEntityHandler = None
except AttributeError:
# some pyexpat versions do not support SkippedEntity
pass
def safe_minidom_parse_string(xml_string):
"""Parse an XML string using minidom safely.
"""
try:
return minidom.parseString(xml_string, parser=ProtectedExpatParser())
except sax.SAXParseException:
raise expat.ExpatError()

View File

@ -117,6 +117,19 @@
#sqlite_synchronous=true
#
# Options defined in ceilometer.openstack.common.deprecated.wsgi
#
# Number of backlog requests to configure the socket with
# (integer value)
#backlog=4096
# Sets the value of TCP_KEEPIDLE in seconds for each server
# socket. Not supported on OS X. (integer value)
#tcp_keepidle=600
#
# Options defined in ceilometer.openstack.common.eventlet_backdoor
#
@ -224,6 +237,15 @@
#syslog_log_facility=LOG_USER
#
# Options defined in ceilometer.openstack.common.middleware.sizelimit
#
# the maximum body size per each request(bytes) (integer
# value)
#max_request_body_size=114688
#
# Options defined in ceilometer.openstack.common.notifier.api
#
@ -245,7 +267,7 @@
# Options defined in ceilometer.openstack.common.notifier.rpc_notifier
#
# AMQP topic used for openstack notifications (list value)
# AMQP topic used for OpenStack notifications (list value)
#notification_topics=notifications
@ -285,7 +307,7 @@
# Modules of exceptions that are permitted to be recreatedupon
# receiving exception data from an rpc call. (list value)
#allowed_rpc_exception_modules=ceilometer.openstack.common.exception,nova.exception,cinder.exception,exceptions
#allowed_rpc_exception_modules=nova.exception,cinder.exception,exceptions
# If passed, use a fake RabbitMQ provider (boolean value)
#fake_rabbit=false
@ -637,7 +659,7 @@
# Options defined in ceilometer.openstack.common.notifier.rpc_notifier2
#
# AMQP topic(s) used for openstack notifications (list value)
# AMQP topic(s) used for OpenStack notifications (list value)
#topics=notifications