Sync OpenStack commons with oslo-incubator

Change-Id: Ia37f4301f149f03201e6ebe286b1256ac036d6bf
This commit is contained in:
Sergey Lukjanov 2013-08-16 13:10:54 +04:00
parent c1a75046da
commit 8cd821e37a
29 changed files with 1313 additions and 144 deletions

View File

@ -0,0 +1,179 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
from Crypto.Hash import HMAC
from Crypto import Random
from savanna.openstack.common.gettextutils import _ # noqa
from savanna.openstack.common import importutils
class CryptoutilsException(Exception):
"""Generic Exception for Crypto utilities."""
message = _("An unknown error occurred in crypto utils.")
class CipherBlockLengthTooBig(CryptoutilsException):
"""The block size is too big."""
def __init__(self, requested, permitted):
msg = _("Block size of %(given)d is too big, max = %(maximum)d")
message = msg % {'given': requested, 'maximum': permitted}
super(CryptoutilsException, self).__init__(message)
class HKDFOutputLengthTooLong(CryptoutilsException):
"""The amount of Key Material asked is too much."""
def __init__(self, requested, permitted):
msg = _("Length of %(given)d is too long, max = %(maximum)d")
message = msg % {'given': requested, 'maximum': permitted}
super(CryptoutilsException, self).__init__(message)
class HKDF(object):
"""An HMAC-based Key Derivation Function implementation (RFC5869)
This class creates an object that allows to use HKDF to derive keys.
"""
def __init__(self, hashtype='SHA256'):
self.hashfn = importutils.import_module('Crypto.Hash.' + hashtype)
self.max_okm_length = 255 * self.hashfn.digest_size
def extract(self, ikm, salt=None):
"""An extract function that can be used to derive a robust key given
weak Input Key Material (IKM) which could be a password.
Returns a pseudorandom key (of HashLen octets)
:param ikm: input keying material (ex a password)
:param salt: optional salt value (a non-secret random value)
"""
if salt is None:
salt = '\x00' * self.hashfn.digest_size
return HMAC.new(salt, ikm, self.hashfn).digest()
def expand(self, prk, info, length):
"""An expand function that will return arbitrary length output that can
be used as keys.
Returns a buffer usable as key material.
:param prk: a pseudorandom key of at least HashLen octets
:param info: optional string (can be a zero-length string)
:param length: length of output keying material (<= 255 * HashLen)
"""
if length > self.max_okm_length:
raise HKDFOutputLengthTooLong(length, self.max_okm_length)
N = (length + self.hashfn.digest_size - 1) / self.hashfn.digest_size
okm = ""
tmp = ""
for block in range(1, N + 1):
tmp = HMAC.new(prk, tmp + info + chr(block), self.hashfn).digest()
okm += tmp
return okm[:length]
MAX_CB_SIZE = 256
class SymmetricCrypto(object):
"""Symmetric Key Crypto object.
This class creates a Symmetric Key Crypto object that can be used
to encrypt, decrypt, or sign arbitrary data.
:param enctype: Encryption Cipher name (default: AES)
:param hashtype: Hash/HMAC type name (default: SHA256)
"""
def __init__(self, enctype='AES', hashtype='SHA256'):
self.cipher = importutils.import_module('Crypto.Cipher.' + enctype)
self.hashfn = importutils.import_module('Crypto.Hash.' + hashtype)
def new_key(self, size):
return Random.new().read(size)
def encrypt(self, key, msg, b64encode=True):
"""Encrypt the provided msg and returns the cyphertext optionally
base64 encoded.
Uses AES-128-CBC with a Random IV by default.
The plaintext is padded to reach blocksize length.
The last byte of the block is the length of the padding.
The length of the padding does not include the length byte itself.
:param key: The Encryption key.
:param msg: the plain text.
:returns encblock: a block of encrypted data.
"""
iv = Random.new().read(self.cipher.block_size)
cipher = self.cipher.new(key, self.cipher.MODE_CBC, iv)
# CBC mode requires a fixed block size. Append padding and length of
# padding.
if self.cipher.block_size > MAX_CB_SIZE:
raise CipherBlockLengthTooBig(self.cipher.block_size, MAX_CB_SIZE)
r = len(msg) % self.cipher.block_size
padlen = self.cipher.block_size - r - 1
msg += '\x00' * padlen
msg += chr(padlen)
enc = iv + cipher.encrypt(msg)
if b64encode:
enc = base64.b64encode(enc)
return enc
def decrypt(self, key, msg, b64decode=True):
"""Decrypts the provided ciphertext, optionally base 64 encoded, and
returns the plaintext message, after padding is removed.
Uses AES-128-CBC with an IV by default.
:param key: The Encryption key.
:param msg: the ciphetext, the first block is the IV
"""
if b64decode:
msg = base64.b64decode(msg)
iv = msg[:self.cipher.block_size]
cipher = self.cipher.new(key, self.cipher.MODE_CBC, iv)
padded = cipher.decrypt(msg[self.cipher.block_size:])
l = ord(padded[-1]) + 1
plain = padded[:-l]
return plain
def sign(self, key, msg, b64encode=True):
"""Signs a message string and returns a base64 encoded signature.
Uses HMAC-SHA-256 by default.
:param key: The Signing key.
:param msg: the message to sign.
"""
h = HMAC.new(key, msg, self.hashfn)
out = h.digest()
if b64encode:
out = base64.b64encode(out)
return out

View File

@ -43,3 +43,9 @@ class DBDeadlock(DBError):
class DBInvalidUnicodeParameter(Exception):
message = _("Invalid Parameter: "
"Unicode is not supported by the current database.")
class DbMigrationError(DBError):
"""Wraps migration specific exception."""
def __init__(self, message=None):
super(DbMigrationError, self).__init__(str(message))

View File

@ -0,0 +1,278 @@
# coding: utf-8
#
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Base on code in migrate/changeset/databases/sqlite.py which is under
# the following license:
#
# The MIT License
#
# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
import distutils.version as dist_version
import os
import re
import migrate
from migrate.changeset import ansisql
from migrate.changeset.databases import sqlite
from migrate.versioning import util as migrate_util
import sqlalchemy
from sqlalchemy.schema import UniqueConstraint
from savanna.openstack.common.db import exception
from savanna.openstack.common.db.sqlalchemy import session as db_session
from savanna.openstack.common.gettextutils import _ # noqa
@migrate_util.decorator
def patched_with_engine(f, *a, **kw):
url = a[0]
engine = migrate_util.construct_engine(url, **kw)
try:
kw['engine'] = engine
return f(*a, **kw)
finally:
if isinstance(engine, migrate_util.Engine) and engine is not url:
migrate_util.log.debug('Disposing SQLAlchemy engine %s', engine)
engine.dispose()
# TODO(jkoelker) When migrate 0.7.3 is released and nova depends
# on that version or higher, this can be removed
MIN_PKG_VERSION = dist_version.StrictVersion('0.7.3')
if (not hasattr(migrate, '__version__') or
dist_version.StrictVersion(migrate.__version__) < MIN_PKG_VERSION):
migrate_util.with_engine = patched_with_engine
# NOTE(jkoelker) Delay importing migrate until we are patched
from migrate import exceptions as versioning_exceptions
from migrate.versioning import api as versioning_api
from migrate.versioning.repository import Repository
_REPOSITORY = None
get_engine = db_session.get_engine
def _get_unique_constraints(self, table):
"""Retrieve information about existing unique constraints of the table
This feature is needed for _recreate_table() to work properly.
Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x.
"""
data = table.metadata.bind.execute(
"""SELECT sql
FROM sqlite_master
WHERE
type='table' AND
name=:table_name""",
table_name=table.name
).fetchone()[0]
UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)"
return [
UniqueConstraint(
*[getattr(table.columns, c.strip(' "')) for c in cols.split(",")],
name=name
)
for name, cols in re.findall(UNIQUE_PATTERN, data)
]
def _recreate_table(self, table, column=None, delta=None, omit_uniques=None):
"""Recreate the table properly
Unlike the corresponding original method of sqlalchemy-migrate this one
doesn't drop existing unique constraints when creating a new one.
"""
table_name = self.preparer.format_table(table)
# we remove all indexes so as not to have
# problems during copy and re-create
for index in table.indexes:
index.drop()
# reflect existing unique constraints
for uc in self._get_unique_constraints(table):
table.append_constraint(uc)
# omit given unique constraints when creating a new table if required
table.constraints = set([
cons for cons in table.constraints
if omit_uniques is None or cons.name not in omit_uniques
])
self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
self.execute()
insertion_string = self._modify_table(table, column, delta)
table.create(bind=self.connection)
self.append(insertion_string % {'table_name': table_name})
self.execute()
self.append('DROP TABLE migration_tmp')
self.execute()
def _visit_migrate_unique_constraint(self, *p, **k):
"""Drop the given unique constraint
The corresponding original method of sqlalchemy-migrate just
raises NotImplemented error
"""
self.recreate_table(p[0].table, omit_uniques=[p[0].name])
def patch_migrate():
"""A workaround for SQLite's inability to alter things
SQLite abilities to alter tables are very limited (please read
http://www.sqlite.org/lang_altertable.html for more details).
E. g. one can't drop a column or a constraint in SQLite. The
workaround for this is to recreate the original table omitting
the corresponding constraint (or column).
sqlalchemy-migrate library has recreate_table() method that
implements this workaround, but it does it wrong:
- information about unique constraints of a table
is not retrieved. So if you have a table with one
unique constraint and a migration adding another one
you will end up with a table that has only the
latter unique constraint, and the former will be lost
- dropping of unique constraints is not supported at all
The proper way to fix this is to provide a pull-request to
sqlalchemy-migrate, but the project seems to be dead. So we
can go on with monkey-patching of the lib at least for now.
"""
# this patch is needed to ensure that recreate_table() doesn't drop
# existing unique constraints of the table when creating a new one
helper_cls = sqlite.SQLiteHelper
helper_cls.recreate_table = _recreate_table
helper_cls._get_unique_constraints = _get_unique_constraints
# this patch is needed to be able to drop existing unique constraints
constraint_cls = sqlite.SQLiteConstraintDropper
constraint_cls.visit_migrate_unique_constraint = \
_visit_migrate_unique_constraint
constraint_cls.__bases__ = (ansisql.ANSIColumnDropper,
sqlite.SQLiteConstraintGenerator)
def db_sync(abs_path, version=None, init_version=0):
"""Upgrade or downgrade a database.
Function runs the upgrade() or downgrade() functions in change scripts.
:param abs_path: Absolute path to migrate repository.
:param version: Database will upgrade/downgrade until this version.
If None - database will update to the latest
available version.
:param init_version: Initial database version
"""
if version is not None:
try:
version = int(version)
except ValueError:
raise exception.DbMigrationError(
message=_("version should be an integer"))
current_version = db_version(abs_path, init_version)
repository = _find_migrate_repo(abs_path)
if version is None or version > current_version:
return versioning_api.upgrade(get_engine(), repository, version)
else:
return versioning_api.downgrade(get_engine(), repository,
version)
def db_version(abs_path, init_version):
"""Show the current version of the repository.
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
try:
return versioning_api.db_version(get_engine(), repository)
except versioning_exceptions.DatabaseNotControlledError:
meta = sqlalchemy.MetaData()
engine = get_engine()
meta.reflect(bind=engine)
tables = meta.tables
if len(tables) == 0:
db_version_control(abs_path, init_version)
return versioning_api.db_version(get_engine(), repository)
else:
# Some pre-Essex DB's may not be version controlled.
# Require them to upgrade using Essex first.
raise exception.DbMigrationError(
message=_("Upgrade DB using Essex release first."))
def db_version_control(abs_path, version=None):
"""Mark a database as under this repository's version control.
Once a database is under version control, schema changes should
only be done via change scripts in this repository.
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
versioning_api.version_control(get_engine(), repository, version)
return version
def _find_migrate_repo(abs_path):
"""Get the project's change script repository
:param abs_path: Absolute path to migrate repository
"""
global _REPOSITORY
if not os.path.exists(abs_path):
raise exception.DbMigrationError("Path %s not found" % abs_path)
if _REPOSITORY is None:
_REPOSITORY = Repository(abs_path)
return _REPOSITORY

View File

@ -22,6 +22,8 @@
SQLAlchemy models.
"""
import six
from sqlalchemy import Column, Integer
from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper
@ -70,12 +72,12 @@ class ModelBase(object):
return self
def next(self):
n = self._i.next()
n = six.advance_iterator(self._i)
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in values.iteritems():
for k, v in six.iteritems(values):
setattr(self, k, v)
def iteritems(self):
@ -84,7 +86,7 @@ class ModelBase(object):
Includes attributes from joins.
"""
local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems()
joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
if not k[0] == '_'])
local.update(joined)
return local.iteritems()

View File

@ -279,13 +279,11 @@ database_opts = [
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE')],
secret=True),
group='DATABASE')]),
cfg.StrOpt('slave_connection',
default='',
help='The SQLAlchemy connection string used to connect to the '
'slave database',
secret=True),
'slave database'),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
@ -478,6 +476,11 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
if engine_name not in ["mysql", "sqlite", "postgresql"]:
return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message)
if not m:
return
@ -510,6 +513,11 @@ def _raise_if_deadlock_error(operational_error, engine_name):
re = _DEADLOCK_RE_DB.get(engine_name)
if re is None:
return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
m = re.match(operational_error.message)
if not m:
return

61
savanna/openstack/common/db/sqlalchemy/utils.py Executable file → Normal file
View File

@ -18,6 +18,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from migrate.changeset import UniqueConstraint
import sqlalchemy
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
@ -37,13 +40,21 @@ from sqlalchemy.types import NullType
from savanna.openstack.common.gettextutils import _ # noqa
from savanna.openstack.common import exception
from savanna.openstack.common import log as logging
from savanna.openstack.common import timeutils
LOG = logging.getLogger(__name__)
_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+")
def sanitize_db_url(url):
match = _DBURL_REGEX.match(url)
if match:
return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):])
return url
class InvalidSortKey(Exception):
message = _("Sort key supplied was not valid.")
@ -174,6 +185,10 @@ def visit_insert_from_select(element, compiler, **kw):
compiler.process(element.select))
class ColumnError(Exception):
"""Error raised when no column or an invalid column is found."""
def _get_not_supported_column(col_name_col_instance, column_name):
try:
column = col_name_col_instance[column_name]
@ -181,16 +196,53 @@ def _get_not_supported_column(col_name_col_instance, column_name):
msg = _("Please specify column %s in col_name_col_instance "
"param. It is required because column has unsupported "
"type by sqlite).")
raise exception.OpenstackException(message=msg % column_name)
raise ColumnError(msg % column_name)
if not isinstance(column, Column):
msg = _("col_name_col_instance param has wrong type of "
"column instance for column %s It should be instance "
"of sqlalchemy.Column.")
raise exception.OpenstackException(message=msg % column_name)
raise ColumnError(msg % column_name)
return column
def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance):
"""Drop unique constraint from table.
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their
type with NullType in metadata. We process these columns and replace
NullType with the correct column type.
:param migrate_engine: sqlalchemy engine
:param table_name: name of table that contains uniq constraint.
:param uc_name: name of uniq constraint that will be dropped.
:param columns: columns that are in uniq constraint.
:param col_name_col_instance: contains pair column_name=column_instance.
column_instance is instance of Column. These params
are required only for columns that have unsupported
types by sqlite. For example BigInteger.
"""
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
if migrate_engine.name == "sqlite":
override_cols = [
_get_not_supported_column(col_name_col_instance, col.name)
for col in t.columns
if isinstance(col.type, NullType)
]
for col in override_cols:
t.columns.replace(col)
uc = UniqueConstraint(*columns, table=t, name=uc_name)
uc.drop()
def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
use_soft_delete, *uc_column_names):
"""Drop all old rows having the same values for columns in uc_columns.
@ -248,8 +300,7 @@ def _get_default_deleted_value(table):
return 0
if isinstance(table.c.id.type, String):
return ""
raise exception.OpenstackException(
message=_("Unsupported id columns type"))
raise ColumnError(_("Unsupported id columns type"))
def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):

View File

@ -33,7 +33,7 @@ class Error(Exception):
class ApiError(Error):
def __init__(self, message='Unknown', code='Unknown'):
self.message = message
self.api_message = message
self.code = code
super(ApiError, self).__init__('%s: %s' % (code, message))
@ -44,19 +44,19 @@ class NotFound(Error):
class UnknownScheme(Error):
msg = "Unknown scheme '%s' found in URI"
msg_fmt = "Unknown scheme '%s' found in URI"
def __init__(self, scheme):
msg = self.__class__.msg % scheme
msg = self.msg_fmt % scheme
super(UnknownScheme, self).__init__(msg)
class BadStoreUri(Error):
msg = "The Store URI %s was malformed. Reason: %s"
msg_fmt = "The Store URI %s was malformed. Reason: %s"
def __init__(self, uri, reason):
msg = self.__class__.msg % (uri, reason)
msg = self.msg_fmt % (uri, reason)
super(BadStoreUri, self).__init__(msg)
@ -100,9 +100,7 @@ def wrap_exception(f):
return f(*args, **kw)
except Exception as e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception(_('Uncaught exception'))
#logging.error(traceback.extract_stack(exc_traceback))
raise Error(str(e))
raise
_wrap.func_name = f.func_name
@ -113,29 +111,29 @@ class OpenstackException(Exception):
"""Base Exception class.
To correctly use this class, inherit from it and define
a 'message' property. That message will get printf'd
a 'msg_fmt' property. That message will get printf'd
with the keyword arguments provided to the constructor.
"""
message = "An unknown exception occurred"
msg_fmt = "An unknown exception occurred"
def __init__(self, **kwargs):
try:
self._error_string = self.message % kwargs
self._error_string = self.msg_fmt % kwargs
except Exception:
if _FATAL_EXCEPTION_FORMAT_ERRORS:
raise
else:
# at least get the core message out if something happened
self._error_string = self.message
self._error_string = self.msg_fmt
def __str__(self):
return self._error_string
class MalformedRequestBody(OpenstackException):
message = "Malformed message body: %(reason)s"
msg_fmt = "Malformed message body: %(reason)s"
class InvalidContentType(OpenstackException):
message = "Invalid content type %(content_type)s"
msg_fmt = "Invalid content type %(content_type)s"

View File

@ -77,7 +77,8 @@ def forever_retry_uncaught_exceptions(infunc):
try:
return infunc(*args, **kwargs)
except Exception as exc:
if exc.message == last_exc_message:
this_exc_message = unicode(exc)
if this_exc_message == last_exc_message:
exc_count += 1
else:
exc_count = 1
@ -85,12 +86,12 @@ def forever_retry_uncaught_exceptions(infunc):
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
exc.message != last_exc_message):
this_exc_message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = exc.message
last_exc_message = this_exc_message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.

View File

@ -1,8 +1,8 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -31,17 +31,36 @@ import os
import re
import UserString
from babel import localedata
import six
_localedir = os.environ.get('savanna'.upper() + '_LOCALEDIR')
_t = gettext.translation('savanna', localedir=_localedir, fallback=True)
_AVAILABLE_LANGUAGES = []
USE_LAZY = False
def enable_lazy():
"""Convenience function for configuring _() to use lazy gettext
Call this at the start of execution to enable the gettextutils._
function to use lazy gettext functionality. This is useful if
your project is importing _ directly instead of using the
gettextutils.install() way of importing the _ function.
"""
global USE_LAZY
USE_LAZY = True
def _(msg):
if USE_LAZY:
return Message(msg, 'savanna')
else:
return _t.ugettext(msg)
def install(domain):
def install(domain, lazy=False):
"""Install a _() function using the given translation domain.
Given a translation domain, install a _() function using gettext's
@ -51,41 +70,45 @@ def install(domain):
overriding the default localedir (e.g. /usr/share/locale) using
a translation-domain-specific environment variable (e.g.
NOVA_LOCALEDIR).
:param domain: the translation domain
:param lazy: indicates whether or not to install the lazy _() function.
The lazy _() introduces a way to do deferred translation
of messages by installing a _ that builds Message objects,
instead of strings, which can then be lazily translated into
any available locale.
"""
gettext.install(domain,
localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
unicode=True)
"""
Lazy gettext functionality.
The following is an attempt to introduce a deferred way
to do translations on messages in OpenStack. We attempt to
override the standard _() function and % (format string) operation
to build Message objects that can later be translated when we have
more information. Also included is an example LogHandler that
translates Messages to an associated locale, effectively allowing
many logs, each with their own locale.
"""
def get_lazy_gettext(domain):
"""Assemble and return a lazy gettext function for a given domain.
Factory method for a project/module to get a lazy gettext function
for its own translation domain (i.e. nova, glance, cinder, etc.)
"""
if lazy:
# NOTE(mrodden): Lazy gettext functionality.
#
# The following introduces a deferred way to do translations on
# messages in OpenStack. We override the standard _() function
# and % (format string) operation to build Message objects that can
# later be translated when we have more information.
#
# Also included below is an example LocaleHandler that translates
# Messages to an associated locale, effectively allowing many logs,
# each with their own locale.
def _lazy_gettext(msg):
"""Create and return a Message object.
Message encapsulates a string so that we can translate it later when
needed.
Lazy gettext function for a given domain, it is a factory method
for a project/module to get a lazy gettext function for its own
translation domain (i.e. nova, glance, cinder, etc.)
Message encapsulates a string so that we can translate
it later when needed.
"""
return Message(msg, domain)
return _lazy_gettext
import __builtin__
__builtin__.__dict__['_'] = _lazy_gettext
else:
localedir = '%s_LOCALEDIR' % domain.upper()
gettext.install(domain,
localedir=os.environ.get(localedir),
unicode=True)
class Message(UserString.UserString, object):
@ -130,7 +153,7 @@ class Message(UserString.UserString, object):
# look for %(blah) fields in string;
# ignore %% and deal with the
# case where % is first character on the line
keys = re.findall('(?:[^%]|^)%\((\w*)\)[a-z]', full_msg)
keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', full_msg)
# if we don't find any %(blah) blocks but have a %s
if not keys and re.findall('(?:[^%]|^)%[a-z]', full_msg):
@ -232,6 +255,45 @@ class Message(UserString.UserString, object):
return UserString.UserString.__getattribute__(self, name)
def get_available_languages(domain):
"""Lists the available languages for the given translation domain.
:param domain: the domain to get languages for
"""
if _AVAILABLE_LANGUAGES:
return _AVAILABLE_LANGUAGES
localedir = '%s_LOCALEDIR' % domain.upper()
find = lambda x: gettext.find(domain,
localedir=os.environ.get(localedir),
languages=[x])
# NOTE(mrodden): en_US should always be available (and first in case
# order matters) since our in-line message strings are en_US
_AVAILABLE_LANGUAGES.append('en_US')
# NOTE(luisg): Babel <1.0 used a function called list(), which was
# renamed to locale_identifiers() in >=1.0, the requirements master list
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
# this check when the master list updates to >=1.0, and all projects udpate
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()
for i in locale_identifiers:
if find(i) is not None:
_AVAILABLE_LANGUAGES.append(i)
return _AVAILABLE_LANGUAGES
def get_localized_message(message, user_locale):
"""Gets a localized version of the given message in the given locale."""
if (isinstance(message, Message)):
if user_locale:
message.locale = user_locale
return unicode(message)
else:
return message
class LocaleHandler(logging.Handler):
"""Handler that can have a locale associated to translate Messages.

View File

@ -15,16 +15,15 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Greenthread local storage of variables using weak references"""
"""Local storage of variables using weak references"""
import threading
import weakref
from eventlet import corolocal
class WeakLocal(corolocal.local):
class WeakLocal(threading.local):
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
rval = super(WeakLocal, self).__getattribute__(attr)
if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
@ -34,7 +33,7 @@ class WeakLocal(corolocal.local):
def __setattr__(self, attr, value):
value = weakref.ref(value)
return corolocal.local.__setattr__(self, attr, value)
return super(WeakLocal, self).__setattr__(attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
@ -45,4 +44,4 @@ store = WeakLocal()
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
strong_store = corolocal.local
strong_store = threading.local()

View File

@ -29,8 +29,6 @@ It also allows setting of formatting information through conf.
"""
import ConfigParser
import cStringIO
import inspect
import itertools
import logging
@ -41,6 +39,7 @@ import sys
import traceback
from oslo.config import cfg
from six import moves
from savanna.openstack.common.gettextutils import _ # noqa
from savanna.openstack.common import importutils
@ -348,7 +347,7 @@ class LogConfigError(Exception):
def _load_log_config(log_config):
try:
logging.config.fileConfig(log_config)
except ConfigParser.Error as exc:
except moves.configparser.Error as exc:
raise LogConfigError(log_config, str(exc))
@ -521,7 +520,7 @@ class ContextFormatter(logging.Formatter):
if not record:
return logging.Formatter.formatException(self, exc_info)
stringbuffer = cStringIO.StringIO()
stringbuffer = moves.StringIO()
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
None, stringbuffer)
lines = stringbuffer.getvalue().split('\n')

View File

@ -28,11 +28,7 @@ class Middleware(object):
@classmethod
def factory(cls, global_conf, **local_conf):
"""Factory method for paste.deploy."""
def filter(app):
return cls(app)
return filter
return cls
def __init__(self, application):
self.application = application

View File

@ -25,7 +25,7 @@ CONF = cfg.CONF
def notify(_context, message):
"""Notifies the recipient of the desired event given the model.
Log notifications using openstack's default logging system.
Log notifications using OpenStack's default logging system.
"""
priority = message.get('priority',

View File

@ -24,7 +24,7 @@ LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'notification_topics', default=['notifications', ],
help='AMQP topic used for openstack notifications')
help='AMQP topic used for OpenStack notifications')
CONF = cfg.CONF
CONF.register_opt(notification_topic_opt)

View File

@ -26,7 +26,7 @@ LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'topics', default=['notifications', ],
help='AMQP topic(s) used for openstack notifications')
help='AMQP topic(s) used for OpenStack notifications')
opt_group = cfg.OptGroup(name='rpc_notifier2',
title='Options for rpc_notifier2')

View File

@ -56,8 +56,7 @@ rpc_opts = [
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
default=['savanna.openstack.common.exception',
'nova.exception',
default=['nova.exception',
'cinder.exception',
'exceptions',
],

View File

@ -300,8 +300,13 @@ def pack_context(msg, context):
for args at some point.
"""
if isinstance(context, dict):
context_d = dict([('_context_%s' % key, value)
for (key, value) in context.iteritems()])
else:
context_d = dict([('_context_%s' % key, value)
for (key, value) in context.to_dict().iteritems()])
msg.update(context_d)

View File

@ -490,12 +490,8 @@ class Connection(object):
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
if not ssl_params:
# Just have the default behavior
return True
else:
# Return the extended behavior
return ssl_params
# Return the extended behavior or just have the default behavior
return ssl_params or True
def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have

View File

@ -238,7 +238,7 @@ class FanoutConsumer(ConsumerBase):
{"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name()
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
@ -320,7 +320,7 @@ class DirectPublisher(Publisher):
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "Direct"})
{"type": "direct"})
class TopicPublisher(Publisher):

View File

@ -383,6 +383,7 @@ class ZmqBaseReactor(ConsumerBase):
LOG.info(_("In reactor registered"))
def consume_in_thread(self):
@excutils.forever_retry_uncaught_exceptions
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:

View File

@ -248,9 +248,7 @@ class DirectBinding(Binding):
that it maps directly to a host, thus direct.
"""
def test(self, key):
if '.' in key:
return True
return False
return '.' in key
class TopicBinding(Binding):
@ -262,17 +260,13 @@ class TopicBinding(Binding):
matches that of a direct exchange.
"""
def test(self, key):
if '.' not in key:
return True
return False
return '.' not in key
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
if key.startswith('fanout~'):
return True
return False
return key.startswith('fanout~')
class StubExchange(Exchange):

View File

@ -63,9 +63,7 @@ class RingExchange(mm.Exchange):
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
return key in self.ring0
class RoundRobinRingExchange(RingExchange):

View File

@ -0,0 +1,521 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import collections
import os
import struct
import time
import requests
from oslo.config import cfg
from savanna.openstack.common.crypto import utils as cryptoutils
from savanna.openstack.common import jsonutils
from savanna.openstack.common import log as logging
secure_message_opts = [
cfg.BoolOpt('enabled', default=True,
help='Whether Secure Messaging (Signing) is enabled,'
' defaults to enabled'),
cfg.BoolOpt('enforced', default=False,
help='Whether Secure Messaging (Signing) is enforced,'
' defaults to not enforced'),
cfg.BoolOpt('encrypt', default=False,
help='Whether Secure Messaging (Encryption) is enabled,'
' defaults to not enabled'),
cfg.StrOpt('secret_keys_file',
help='Path to the file containing the keys, takes precedence'
' over secret_key'),
cfg.MultiStrOpt('secret_key',
help='A list of keys: (ex: name:<base64 encoded key>),'
' ignored if secret_keys_file is set'),
cfg.StrOpt('kds_endpoint',
help='KDS endpoint (ex: http://kds.example.com:35357/v3)'),
]
secure_message_group = cfg.OptGroup('secure_messages',
title='Secure Messaging options')
LOG = logging.getLogger(__name__)
class SecureMessageException(Exception):
"""Generic Exception for Secure Messages."""
msg = "An unknown Secure Message related exception occurred."
def __init__(self, msg=None):
if msg is None:
msg = self.msg
super(SecureMessageException, self).__init__(msg)
class SharedKeyNotFound(SecureMessageException):
"""No shared key was found and no other external authentication mechanism
is available.
"""
msg = "Shared Key for [%s] Not Found. (%s)"
def __init__(self, name, errmsg):
super(SharedKeyNotFound, self).__init__(self.msg % (name, errmsg))
class InvalidMetadata(SecureMessageException):
"""The metadata is invalid."""
msg = "Invalid metadata: %s"
def __init__(self, err):
super(InvalidMetadata, self).__init__(self.msg % err)
class InvalidSignature(SecureMessageException):
"""Signature validation failed."""
msg = "Failed to validate signature (source=%s, destination=%s)"
def __init__(self, src, dst):
super(InvalidSignature, self).__init__(self.msg % (src, dst))
class UnknownDestinationName(SecureMessageException):
"""The Destination name is unknown to us."""
msg = "Invalid destination name (%s)"
def __init__(self, name):
super(UnknownDestinationName, self).__init__(self.msg % name)
class InvalidEncryptedTicket(SecureMessageException):
"""The Encrypted Ticket could not be successfully handled."""
msg = "Invalid Ticket (source=%s, destination=%s)"
def __init__(self, src, dst):
super(InvalidEncryptedTicket, self).__init__(self.msg % (src, dst))
class InvalidExpiredTicket(SecureMessageException):
"""The ticket received is already expired."""
msg = "Expired ticket (source=%s, destination=%s)"
def __init__(self, src, dst):
super(InvalidExpiredTicket, self).__init__(self.msg % (src, dst))
class CommunicationError(SecureMessageException):
"""The Communication with the KDS failed."""
msg = "Communication Error (target=%s): %s"
def __init__(self, target, errmsg):
super(CommunicationError, self).__init__(self.msg % (target, errmsg))
class InvalidArgument(SecureMessageException):
"""Bad initialization argument."""
msg = "Invalid argument: %s"
def __init__(self, errmsg):
super(InvalidArgument, self).__init__(self.msg % errmsg)
Ticket = collections.namedtuple('Ticket', ['skey', 'ekey', 'esek'])
class KeyStore(object):
"""A storage class for Signing and Encryption Keys.
This class creates an object that holds Generic Keys like Signing
Keys, Encryption Keys, Encrypted SEK Tickets ...
"""
def __init__(self):
self._kvps = dict()
def _get_key_name(self, source, target, ktype):
return (source, target, ktype)
def _put(self, src, dst, ktype, expiration, data):
name = self._get_key_name(src, dst, ktype)
self._kvps[name] = (expiration, data)
def _get(self, src, dst, ktype):
name = self._get_key_name(src, dst, ktype)
if name in self._kvps:
expiration, data = self._kvps[name]
if expiration > time.time():
return data
else:
del self._kvps[name]
return None
def clear(self):
"""Wipes the store clear of all data."""
self._kvps.clear()
def put_ticket(self, source, target, skey, ekey, esek, expiration):
"""Puts a sek pair in the cache.
:param source: Client name
:param target: Target name
:param skey: The Signing Key
:param ekey: The Encription Key
:param esek: The token encrypted with the target key
:param expiration: Expiration time in seconds since Epoch
"""
keys = Ticket(skey, ekey, esek)
self._put(source, target, 'ticket', expiration, keys)
def get_ticket(self, source, target):
"""Returns a Ticket (skey, ekey, esek) namedtuple for the
source/target pair.
"""
return self._get(source, target, 'ticket')
_KEY_STORE = KeyStore()
class _KDSClient(object):
USER_AGENT = 'oslo-incubator/rpc'
def __init__(self, endpoint=None, timeout=None):
"""A KDS Client class."""
self._endpoint = endpoint
if timeout is not None:
self.timeout = float(timeout)
else:
self.timeout = None
def _do_get(self, url, request):
req_kwargs = dict()
req_kwargs['headers'] = dict()
req_kwargs['headers']['User-Agent'] = self.USER_AGENT
req_kwargs['headers']['Content-Type'] = 'application/json'
req_kwargs['data'] = jsonutils.dumps({'request': request})
if self.timeout is not None:
req_kwargs['timeout'] = self.timeout
try:
resp = requests.get(url, **req_kwargs)
except requests.ConnectionError as e:
err = "Unable to establish connection. %s" % e
raise CommunicationError(url, err)
return resp
def _get_reply(self, url, resp):
if resp.text:
try:
body = jsonutils.loads(resp.text)
reply = body['reply']
except (KeyError, TypeError, ValueError):
msg = "Failed to decode reply: %s" % resp.text
raise CommunicationError(url, msg)
else:
msg = "No reply data was returned."
raise CommunicationError(url, msg)
return reply
def _get_ticket(self, request, url=None, redirects=10):
"""Send an HTTP request.
Wraps around 'requests' to handle redirects and common errors.
"""
if url is None:
if not self._endpoint:
raise CommunicationError(url, 'Endpoint not configured')
url = self._endpoint + '/kds/ticket'
while redirects:
resp = self._do_get(url, request)
if resp.status_code in (301, 302, 305):
# Redirected. Reissue the request to the new location.
url = resp.headers['location']
redirects -= 1
continue
elif resp.status_code != 200:
msg = "Request returned failure status: %s (%s)"
err = msg % (resp.status_code, resp.text)
raise CommunicationError(url, err)
return self._get_reply(url, resp)
raise CommunicationError(url, "Too many redirections, giving up!")
def get_ticket(self, source, target, crypto, key):
# prepare metadata
md = {'requestor': source,
'target': target,
'timestamp': time.time(),
'nonce': struct.unpack('Q', os.urandom(8))[0]}
metadata = base64.b64encode(jsonutils.dumps(md))
# sign metadata
signature = crypto.sign(key, metadata)
# HTTP request
reply = self._get_ticket({'metadata': metadata,
'signature': signature})
# verify reply
signature = crypto.sign(key, (reply['metadata'] + reply['ticket']))
if signature != reply['signature']:
raise InvalidEncryptedTicket(md['source'], md['destination'])
md = jsonutils.loads(base64.b64decode(reply['metadata']))
if ((md['source'] != source or
md['destination'] != target or
md['expiration'] < time.time())):
raise InvalidEncryptedTicket(md['source'], md['destination'])
# return ticket data
tkt = jsonutils.loads(crypto.decrypt(key, reply['ticket']))
return tkt, md['expiration']
# we need to keep a global nonce, as this value should never repeat non
# matter how many SecureMessage objects we create
_NONCE = None
def _get_nonce():
"""We keep a single counter per instance, as it is so huge we can't
possibly cycle through within 1/100 of a second anyway.
"""
global _NONCE
# Lazy initialize, for now get a random value, multiply by 2^32 and
# use it as the nonce base. The counter itself will rotate after
# 2^32 increments.
if _NONCE is None:
_NONCE = [struct.unpack('I', os.urandom(4))[0], 0]
# Increment counter and wrap at 2^32
_NONCE[1] += 1
if _NONCE[1] > 0xffffffff:
_NONCE[1] = 0
# Return base + counter
return long((_NONCE[0] * 0xffffffff)) + _NONCE[1]
class SecureMessage(object):
"""A Secure Message object.
This class creates a signing/encryption facility for RPC messages.
It encapsulates all the necessary crypto primitives to insulate
regular code from the intricacies of message authentication, validation
and optionally encryption.
:param topic: The topic name of the queue
:param host: The server name, together with the topic it forms a unique
name that is used to source signing keys, and verify
incoming messages.
:param conf: a ConfigOpts object
:param key: (optional) explicitly pass in endpoint private key.
If not provided it will be sourced from the service config
:param key_store: (optional) Storage class for local caching
:param encrypt: (defaults to False) Whether to encrypt messages
:param enctype: (defaults to AES) Cipher to use
:param hashtype: (defaults to SHA256) Hash function to use for signatures
"""
def __init__(self, topic, host, conf, key=None, key_store=None,
encrypt=None, enctype='AES', hashtype='SHA256'):
conf.register_group(secure_message_group)
conf.register_opts(secure_message_opts, group='secure_messages')
self._name = '%s.%s' % (topic, host)
self._key = key
self._conf = conf.secure_messages
self._encrypt = self._conf.encrypt if (encrypt is None) else encrypt
self._crypto = cryptoutils.SymmetricCrypto(enctype, hashtype)
self._hkdf = cryptoutils.HKDF(hashtype)
self._kds = _KDSClient(self._conf.kds_endpoint)
if self._key is None:
self._key = self._init_key(topic, self._name)
if self._key is None:
err = "Secret Key (or key file) is missing or malformed"
raise SharedKeyNotFound(self._name, err)
self._key_store = key_store or _KEY_STORE
def _init_key(self, topic, name):
keys = None
if self._conf.secret_keys_file:
with open(self._conf.secret_keys_file, 'r') as f:
keys = f.readlines()
elif self._conf.secret_key:
keys = self._conf.secret_key
if keys is None:
return None
for k in keys:
if k[0] == '#':
continue
if ':' not in k:
break
svc, key = k.split(':', 1)
if svc == topic or svc == name:
return base64.b64decode(key)
return None
def _split_key(self, key, size):
sig_key = key[:size]
enc_key = key[size:]
return sig_key, enc_key
def _decode_esek(self, key, source, target, timestamp, esek):
"""This function decrypts the esek buffer passed in and returns a
KeyStore to be used to check and decrypt the received message.
:param key: The key to use to decrypt the ticket (esek)
:param source: The name of the source service
:param traget: The name of the target service
:param timestamp: The incoming message timestamp
:param esek: a base64 encoded encrypted block containing a JSON string
"""
rkey = None
try:
s = self._crypto.decrypt(key, esek)
j = jsonutils.loads(s)
rkey = base64.b64decode(j['key'])
expiration = j['timestamp'] + j['ttl']
if j['timestamp'] > timestamp or timestamp > expiration:
raise InvalidExpiredTicket(source, target)
except Exception:
raise InvalidEncryptedTicket(source, target)
info = '%s,%s,%s' % (source, target, str(j['timestamp']))
sek = self._hkdf.expand(rkey, info, len(key) * 2)
return self._split_key(sek, len(key))
def _get_ticket(self, target):
"""This function will check if we already have a SEK for the specified
target in the cache, or will go and try to fetch a new SEK from the key
server.
:param target: The name of the target service
"""
ticket = self._key_store.get_ticket(self._name, target)
if ticket is not None:
return ticket
tkt, expiration = self._kds.get_ticket(self._name, target,
self._crypto, self._key)
self._key_store.put_ticket(self._name, target,
base64.b64decode(tkt['skey']),
base64.b64decode(tkt['ekey']),
tkt['esek'], expiration)
return self._key_store.get_ticket(self._name, target)
def encode(self, version, target, json_msg):
"""This is the main encoding function.
It takes a target and a message and returns a tuple consisting of a
JSON serialized metadata object, a JSON serialized (and optionally
encrypted) message, and a signature.
:param version: the current envelope version
:param target: The name of the target service (usually with hostname)
:param json_msg: a serialized json message object
"""
ticket = self._get_ticket(target)
metadata = jsonutils.dumps({'source': self._name,
'destination': target,
'timestamp': time.time(),
'nonce': _get_nonce(),
'esek': ticket.esek,
'encryption': self._encrypt})
message = json_msg
if self._encrypt:
message = self._crypto.encrypt(ticket.ekey, message)
signature = self._crypto.sign(ticket.skey,
version + metadata + message)
return (metadata, message, signature)
def decode(self, version, metadata, message, signature):
"""This is the main decoding function.
It takes a version, metadata, message and signature strings and
returns a tuple with a (decrypted) message and metadata or raises
an exception in case of error.
:param version: the current envelope version
:param metadata: a JSON serialized object with metadata for validation
:param message: a JSON serialized (base64 encoded encrypted) message
:param signature: a base64 encoded signature
"""
md = jsonutils.loads(metadata)
check_args = ('source', 'destination', 'timestamp',
'nonce', 'esek', 'encryption')
for arg in check_args:
if arg not in md:
raise InvalidMetadata('Missing metadata "%s"' % arg)
if md['destination'] != self._name:
# TODO(simo) handle group keys by checking target
raise UnknownDestinationName(md['destination'])
try:
skey, ekey = self._decode_esek(self._key,
md['source'], md['destination'],
md['timestamp'], md['esek'])
except InvalidExpiredTicket:
raise
except Exception:
raise InvalidMetadata('Failed to decode ESEK for %s/%s' % (
md['source'], md['destination']))
sig = self._crypto.sign(skey, version + metadata + message)
if sig != signature:
raise InvalidSignature(md['source'], md['destination'])
if md['encryption'] is True:
msg = self._crypto.decrypt(ekey, message)
else:
msg = message
return (md, msg)

View File

@ -32,10 +32,11 @@ class Service(service.Service):
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None):
def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
@ -48,7 +49,8 @@ class Service(service.Service):
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
self.serializer)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)

1
savanna/openstack/common/rpc/zmq_receiver.py Executable file → Normal file
View File

@ -1,4 +1,3 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation

View File

@ -81,6 +81,15 @@ class Launcher(object):
"""
self.services.wait()
def restart(self):
"""Reload config files and restart service.
:returns: None
"""
cfg.CONF.reload_config_files()
self.services.restart()
class SignalExit(SystemExit):
def __init__(self, signo, exccode=1):
@ -93,24 +102,31 @@ class ServiceLauncher(Launcher):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signo)
def wait(self):
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
def _wait_for_exit_or_signal(self):
status = None
signo = 0
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
status = None
try:
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
@ -121,7 +137,16 @@ class ServiceLauncher(Launcher):
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.'))
return status, signo
def wait(self):
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal()
if signo != signal.SIGHUP:
return status
self.restart()
class ServiceWrapper(object):
@ -139,9 +164,12 @@ class ProcessLauncher(object):
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
@ -150,6 +178,7 @@ class ProcessLauncher(object):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
@ -160,16 +189,47 @@ class ProcessLauncher(object):
sys.exit(1)
def _child_process(self, service):
def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _child_wait_for_exit_or_signal(self, launcher):
status = None
signo = 0
try:
launcher.wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
@ -184,7 +244,7 @@ class ProcessLauncher(object):
launcher = Launcher()
launcher.launch_service(service)
launcher.wait()
return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
@ -205,21 +265,13 @@ class ProcessLauncher(object):
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.service)
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
wrap.service.stop()
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if signo != signal.SIGHUP:
break
launcher.restart()
os._exit(status)
@ -265,12 +317,7 @@ class ProcessLauncher(object):
wrap.children.remove(pid)
return wrap
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
@ -279,14 +326,30 @@ class ProcessLauncher(object):
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
while True:
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[self.sigcaught]
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)
if self.sigcaught != signal.SIGHUP:
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
for pid in self.children:
try:
@ -311,6 +374,10 @@ class Service(object):
# signal that the service is done shutting itself down:
self._done = event.Event()
def reset(self):
# NOTE(Fengqian): docs for Event.reset() recommend against using it
self._done = event.Event()
def start(self):
pass
@ -353,6 +420,13 @@ class Services(object):
def wait(self):
self.tg.wait()
def restart(self):
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
def run_service(service, done):
"""Service start wrapper.

View File

@ -49,9 +49,9 @@ def parse_isotime(timestr):
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
raise ValueError(unicode(e))
except TypeError as e:
raise ValueError(e.message)
raise ValueError(unicode(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):

View File

@ -2,11 +2,12 @@
pep8==1.4.5
pyflakes==0.7.2
flake8==2.0
hacking>=0.7.0,<0.8
hacking>=0.5.6,<0.8
Babel>=0.9.6
coverage>=3.6
docutils==0.9.1
mock>=0.8.0
mock>=1.0
nose
openstack.nose_plugin>=0.7
pylint==0.25.2