Sync From OSLO
blueprint: key-distribution-server Change-Id: I424dbd6e8a535279498589444b976ea332e09c64
This commit is contained in:
parent
d004e4f9a4
commit
9473106429
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
@ -23,12 +21,11 @@ context or provide additional information in their specific WSGI pipeline.
|
||||
"""
|
||||
|
||||
import itertools
|
||||
|
||||
from keystone.openstack.common import uuidutils
|
||||
import uuid
|
||||
|
||||
|
||||
def generate_request_id():
|
||||
return 'req-%s' % uuidutils.generate_uuid()
|
||||
return 'req-%s' % str(uuid.uuid4())
|
||||
|
||||
|
||||
class RequestContext(object):
|
||||
@ -39,26 +36,46 @@ class RequestContext(object):
|
||||
accesses the system, as well as additional request information.
|
||||
"""
|
||||
|
||||
def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
|
||||
read_only=False, show_deleted=False, request_id=None):
|
||||
user_idt_format = '{user} {tenant} {domain} {user_domain} {p_domain}'
|
||||
|
||||
def __init__(self, auth_token=None, user=None, tenant=None, domain=None,
|
||||
user_domain=None, project_domain=None, is_admin=False,
|
||||
read_only=False, show_deleted=False, request_id=None,
|
||||
instance_uuid=None):
|
||||
self.auth_token = auth_token
|
||||
self.user = user
|
||||
self.tenant = tenant
|
||||
self.domain = domain
|
||||
self.user_domain = user_domain
|
||||
self.project_domain = project_domain
|
||||
self.is_admin = is_admin
|
||||
self.read_only = read_only
|
||||
self.show_deleted = show_deleted
|
||||
self.instance_uuid = instance_uuid
|
||||
if not request_id:
|
||||
request_id = generate_request_id()
|
||||
self.request_id = request_id
|
||||
|
||||
def to_dict(self):
|
||||
user_idt = (
|
||||
self.user_idt_format.format(user=self.user or '-',
|
||||
tenant=self.tenant or '-',
|
||||
domain=self.domain or '-',
|
||||
user_domain=self.user_domain or '-',
|
||||
p_domain=self.project_domain or '-'))
|
||||
|
||||
return {'user': self.user,
|
||||
'tenant': self.tenant,
|
||||
'domain': self.domain,
|
||||
'user_domain': self.user_domain,
|
||||
'project_domain': self.project_domain,
|
||||
'is_admin': self.is_admin,
|
||||
'read_only': self.read_only,
|
||||
'show_deleted': self.show_deleted,
|
||||
'auth_token': self.auth_token,
|
||||
'request_id': self.request_id}
|
||||
'request_id': self.request_id,
|
||||
'instance_uuid': self.instance_uuid,
|
||||
'user_identity': user_idt}
|
||||
|
||||
|
||||
def get_admin_context(show_deleted=False):
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 Cloudscaling Group, Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 Rackspace Hosting
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -49,3 +47,8 @@ class DbMigrationError(DBError):
|
||||
"""Wraps migration specific exception."""
|
||||
def __init__(self, message=None):
|
||||
super(DbMigrationError, self).__init__(str(message))
|
||||
|
||||
|
||||
class DBConnectionError(DBError):
|
||||
"""Wraps connection specific exception."""
|
||||
pass
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 Cloudscaling Group, Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
@ -36,7 +36,8 @@
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
|
||||
import distutils.version as dist_version
|
||||
import os
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
@ -41,13 +39,13 @@ class ModelBase(object):
|
||||
if not session:
|
||||
session = sa.get_session()
|
||||
# NOTE(boris-42): This part of code should be look like:
|
||||
# sesssion.add(self)
|
||||
# session.add(self)
|
||||
# session.flush()
|
||||
# But there is a bug in sqlalchemy and eventlet that
|
||||
# raises NoneType exception if there is no running
|
||||
# transaction and rollback is called. As long as
|
||||
# sqlalchemy has this bug we have to create transaction
|
||||
# explicity.
|
||||
# explicitly.
|
||||
with session.begin(subtransactions=True):
|
||||
session.add(self)
|
||||
session.flush()
|
||||
|
187
keystone/openstack/common/db/sqlalchemy/provision.py
Normal file
187
keystone/openstack/common/db/sqlalchemy/provision.py
Normal file
@ -0,0 +1,187 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Mirantis.inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Provision test environment for specific DB backends"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
|
||||
import sqlalchemy
|
||||
|
||||
from keystone.openstack.common.db import exception as exc
|
||||
|
||||
|
||||
SQL_CONNECTION = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', 'sqlite://')
|
||||
|
||||
|
||||
def _gen_credentials(*names):
|
||||
"""Generate credentials."""
|
||||
auth_dict = {}
|
||||
for name in names:
|
||||
val = ''.join(random.choice(string.lowercase) for i in xrange(10))
|
||||
auth_dict[name] = val
|
||||
return auth_dict
|
||||
|
||||
|
||||
def _get_engine(uri=SQL_CONNECTION):
|
||||
"""Engine creation
|
||||
|
||||
By default the uri is SQL_CONNECTION which is admin credentials.
|
||||
Call the function without arguments to get admin connection. Admin
|
||||
connection required to create temporary user and database for each
|
||||
particular test. Otherwise use existing connection to recreate connection
|
||||
to the temporary database.
|
||||
"""
|
||||
return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
|
||||
|
||||
|
||||
def _execute_sql(engine, sql, driver):
|
||||
"""Initialize connection, execute sql query and close it."""
|
||||
try:
|
||||
with engine.connect() as conn:
|
||||
if driver == 'postgresql':
|
||||
conn.connection.set_isolation_level(0)
|
||||
for s in sql:
|
||||
conn.execute(s)
|
||||
except sqlalchemy.exc.OperationalError:
|
||||
msg = ('%s does not match database admin '
|
||||
'credentials or database does not exist.')
|
||||
raise exc.DBConnectionError(msg % SQL_CONNECTION)
|
||||
|
||||
|
||||
def create_database(engine):
|
||||
"""Provide temporary user and database for each particular test."""
|
||||
driver = engine.name
|
||||
|
||||
auth = _gen_credentials('database', 'user', 'passwd')
|
||||
|
||||
sqls = {
|
||||
'mysql': [
|
||||
"drop database if exists %(database)s;",
|
||||
"grant all on %(database)s.* to '%(user)s'@'localhost'"
|
||||
" identified by '%(passwd)s';",
|
||||
"create database %(database)s;",
|
||||
],
|
||||
'postgresql': [
|
||||
"drop database if exists %(database)s;",
|
||||
"drop user if exists %(user)s;",
|
||||
"create user %(user)s with password '%(passwd)s';",
|
||||
"create database %(database)s owner %(user)s;",
|
||||
]
|
||||
}
|
||||
|
||||
if driver == 'sqlite':
|
||||
return 'sqlite:////tmp/%s' % auth['database']
|
||||
|
||||
try:
|
||||
sql_rows = sqls[driver]
|
||||
except KeyError:
|
||||
raise ValueError('Unsupported RDBMS %s' % driver)
|
||||
sql_query = map(lambda x: x % auth, sql_rows)
|
||||
|
||||
_execute_sql(engine, sql_query, driver)
|
||||
|
||||
params = auth.copy()
|
||||
params['backend'] = driver
|
||||
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
|
||||
|
||||
|
||||
def drop_database(engine, current_uri):
|
||||
"""Drop temporary database and user after each particular test."""
|
||||
engine = _get_engine(current_uri)
|
||||
admin_engine = _get_engine()
|
||||
driver = engine.name
|
||||
auth = {'database': engine.url.database, 'user': engine.url.username}
|
||||
|
||||
if driver == 'sqlite':
|
||||
try:
|
||||
os.remove(auth['database'])
|
||||
except OSError:
|
||||
pass
|
||||
return
|
||||
|
||||
sqls = {
|
||||
'mysql': [
|
||||
"drop database if exists %(database)s;",
|
||||
"drop user '%(user)s'@'localhost';",
|
||||
],
|
||||
'postgresql': [
|
||||
"drop database if exists %(database)s;",
|
||||
"drop user if exists %(user)s;",
|
||||
]
|
||||
}
|
||||
|
||||
try:
|
||||
sql_rows = sqls[driver]
|
||||
except KeyError:
|
||||
raise ValueError('Unsupported RDBMS %s' % driver)
|
||||
sql_query = map(lambda x: x % auth, sql_rows)
|
||||
|
||||
_execute_sql(admin_engine, sql_query, driver)
|
||||
|
||||
|
||||
def main():
|
||||
"""Controller to handle commands
|
||||
|
||||
::create: Create test user and database with random names.
|
||||
::drop: Drop user and database created by previous command.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Controller to handle database creation and dropping'
|
||||
' commands.',
|
||||
epilog='Under normal circumstances is not used directly.'
|
||||
' Used in .testr.conf to automate test database creation'
|
||||
' and dropping processes.')
|
||||
subparsers = parser.add_subparsers(
|
||||
help='Subcommands to manipulate temporary test databases.')
|
||||
|
||||
create = subparsers.add_parser(
|
||||
'create',
|
||||
help='Create temporary test '
|
||||
'databases and users.')
|
||||
create.set_defaults(which='create')
|
||||
create.add_argument(
|
||||
'instances_count',
|
||||
type=int,
|
||||
help='Number of databases to create.')
|
||||
|
||||
drop = subparsers.add_parser(
|
||||
'drop',
|
||||
help='Drop temporary test databases and users.')
|
||||
drop.set_defaults(which='drop')
|
||||
drop.add_argument(
|
||||
'instances',
|
||||
nargs='+',
|
||||
help='List of databases uri to be dropped.')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
engine = _get_engine()
|
||||
which = args.which
|
||||
|
||||
if which == "create":
|
||||
for i in range(int(args.instances_count)):
|
||||
print(create_database(engine))
|
||||
elif which == "drop":
|
||||
for db in args.instances:
|
||||
drop_database(engine, db)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -109,7 +107,7 @@ Recommended ways to use sessions within this framework:
|
||||
filter_by(id=subq.as_scalar()).\
|
||||
update({'bar': newbar})
|
||||
|
||||
For reference, this emits approximagely the following SQL statement:
|
||||
For reference, this emits approximately the following SQL statement:
|
||||
|
||||
UPDATE bar SET bar = ${newbar}
|
||||
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
|
||||
@ -613,7 +611,7 @@ def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
|
||||
dbapi_conn.cursor().execute('select 1')
|
||||
except dbapi_conn.OperationalError as ex:
|
||||
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
|
||||
LOG.warn(_('Got mysql server has gone away: %s'), ex)
|
||||
LOG.warning(_('Got mysql server has gone away: %s'), ex)
|
||||
raise sqla_exc.DisconnectionError("Database server went away")
|
||||
else:
|
||||
raise
|
||||
@ -695,7 +693,7 @@ def create_engine(sql_connection, sqlite_fk=False):
|
||||
remaining = 'infinite'
|
||||
while True:
|
||||
msg = _('SQL connection failed. %s attempts left.')
|
||||
LOG.warn(msg % remaining)
|
||||
LOG.warning(msg % remaining)
|
||||
if remaining != 'infinite':
|
||||
remaining -= 1
|
||||
time.sleep(CONF.database.retry_interval)
|
||||
@ -754,25 +752,25 @@ def _patch_mysqldb_with_stacktrace_comments():
|
||||
|
||||
def _do_query(self, q):
|
||||
stack = ''
|
||||
for file, line, method, function in traceback.extract_stack():
|
||||
for filename, line, method, function in traceback.extract_stack():
|
||||
# exclude various common things from trace
|
||||
if file.endswith('session.py') and method == '_do_query':
|
||||
if filename.endswith('session.py') and method == '_do_query':
|
||||
continue
|
||||
if file.endswith('api.py') and method == 'wrapper':
|
||||
if filename.endswith('api.py') and method == 'wrapper':
|
||||
continue
|
||||
if file.endswith('utils.py') and method == '_inner':
|
||||
if filename.endswith('utils.py') and method == '_inner':
|
||||
continue
|
||||
if file.endswith('exception.py') and method == '_wrap':
|
||||
if filename.endswith('exception.py') and method == '_wrap':
|
||||
continue
|
||||
# db/api is just a wrapper around db/sqlalchemy/api
|
||||
if file.endswith('db/api.py'):
|
||||
if filename.endswith('db/api.py'):
|
||||
continue
|
||||
# only trace inside keystone
|
||||
index = file.rfind('keystone')
|
||||
index = filename.rfind('keystone')
|
||||
if index == -1:
|
||||
continue
|
||||
stack += "File:%s:%s Method:%s() Line:%s | " \
|
||||
% (file[index:], line, method, function)
|
||||
% (filename[index:], line, method, function)
|
||||
|
||||
# strip trailing " | " from stack
|
||||
if stack:
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010-2011 OpenStack Foundation
|
||||
# Copyright 2012-2013 IBM Corp.
|
||||
# All Rights Reserved.
|
||||
@ -16,17 +14,18 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import commands
|
||||
import ConfigParser
|
||||
import functools
|
||||
import os
|
||||
import urlparse
|
||||
import subprocess
|
||||
|
||||
import lockfile
|
||||
import sqlalchemy
|
||||
import sqlalchemy.exc
|
||||
|
||||
from keystone.openstack.common import lockutils
|
||||
from keystone.openstack.common.gettextutils import _
|
||||
from keystone.openstack.common import log as logging
|
||||
from keystone.openstack.common.py3kcompat import urlutils
|
||||
from keystone.openstack.common import test
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -93,6 +92,22 @@ def get_db_connection_info(conn_pieces):
|
||||
return (user, password, database, host)
|
||||
|
||||
|
||||
def _set_db_lock(lock_path=None, lock_prefix=None):
|
||||
def decorator(f):
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
path = lock_path or os.environ.get("KEYSTONE_LOCK_PATH")
|
||||
lock = lockfile.FileLock(os.path.join(path, lock_prefix))
|
||||
with lock:
|
||||
LOG.debug(_('Got lock "%s"') % f.__name__)
|
||||
return f(*args, **kwargs)
|
||||
finally:
|
||||
LOG.debug(_('Lock released "%s"') % f.__name__)
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
class BaseMigrationTestCase(test.BaseTestCase):
|
||||
"""Base class fort testing of migration utils."""
|
||||
|
||||
@ -143,12 +158,13 @@ class BaseMigrationTestCase(test.BaseTestCase):
|
||||
super(BaseMigrationTestCase, self).tearDown()
|
||||
|
||||
def execute_cmd(self, cmd=None):
|
||||
status, output = commands.getstatusoutput(cmd)
|
||||
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
output = process.communicate()[0]
|
||||
LOG.debug(output)
|
||||
self.assertEqual(0, status,
|
||||
self.assertEqual(0, process.returncode,
|
||||
"Failed to run: %s\n%s" % (cmd, output))
|
||||
|
||||
@lockutils.synchronized('pgadmin', 'tests-', external=True)
|
||||
def _reset_pg(self, conn_pieces):
|
||||
(user, password, database, host) = get_db_connection_info(conn_pieces)
|
||||
os.environ['PGPASSWORD'] = password
|
||||
@ -170,10 +186,11 @@ class BaseMigrationTestCase(test.BaseTestCase):
|
||||
os.unsetenv('PGPASSWORD')
|
||||
os.unsetenv('PGUSER')
|
||||
|
||||
@_set_db_lock(lock_prefix='migration_tests-')
|
||||
def _reset_databases(self):
|
||||
for key, engine in self.engines.items():
|
||||
conn_string = self.test_databases[key]
|
||||
conn_pieces = urlparse.urlparse(conn_string)
|
||||
conn_pieces = urlutils.urlparse(conn_string)
|
||||
engine.dispose()
|
||||
if conn_string.startswith('sqlite'):
|
||||
# We can just delete the SQLite database, which is
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2010-2011 OpenStack Foundation.
|
||||
@ -96,7 +94,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
|
||||
if 'id' not in sort_keys:
|
||||
# TODO(justinsb): If this ever gives a false-positive, check
|
||||
# the actual primary key, rather than assuming its id
|
||||
LOG.warn(_('Id not in sort_keys; is sort_keys unique?'))
|
||||
LOG.warning(_('Id not in sort_keys; is sort_keys unique?'))
|
||||
|
||||
assert(not (sort_dir and sort_dirs))
|
||||
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2012 OpenStack Foundation.
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# Copyright 2012, Red Hat, Inc.
|
||||
#
|
||||
@ -24,6 +22,8 @@ import sys
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import six
|
||||
|
||||
from keystone.openstack.common.gettextutils import _ # noqa
|
||||
|
||||
|
||||
@ -65,7 +65,7 @@ class save_and_reraise_exception(object):
|
||||
self.tb))
|
||||
return False
|
||||
if self.reraise:
|
||||
raise self.type_, self.value, self.tb
|
||||
six.reraise(self.type_, self.value, self.tb)
|
||||
|
||||
|
||||
def forever_retry_uncaught_exceptions(infunc):
|
||||
@ -77,7 +77,7 @@ def forever_retry_uncaught_exceptions(infunc):
|
||||
try:
|
||||
return infunc(*args, **kwargs)
|
||||
except Exception as exc:
|
||||
this_exc_message = unicode(exc)
|
||||
this_exc_message = six.u(str(exc))
|
||||
if this_exc_message == last_exc_message:
|
||||
exc_count += 1
|
||||
else:
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
@ -19,6 +17,7 @@
|
||||
import contextlib
|
||||
import errno
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from keystone.openstack.common import excutils
|
||||
from keystone.openstack.common.gettextutils import _ # noqa
|
||||
@ -109,3 +108,30 @@ def file_open(*args, **kwargs):
|
||||
state at all (for unit tests)
|
||||
"""
|
||||
return file(*args, **kwargs)
|
||||
|
||||
|
||||
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
|
||||
"""Create temporary file or use existing file.
|
||||
|
||||
This util is needed for creating temporary file with
|
||||
specified content, suffix and prefix. If path is not None,
|
||||
it will be used for writing content. If the path doesn't
|
||||
exist it'll be created.
|
||||
|
||||
:param content: content for temporary file.
|
||||
:param path: same as parameter 'dir' for mkstemp
|
||||
:param suffix: same as parameter 'suffix' for mkstemp
|
||||
:param prefix: same as parameter 'prefix' for mkstemp
|
||||
|
||||
For example: it can be used in database tests for creating
|
||||
configuration files.
|
||||
"""
|
||||
if path:
|
||||
ensure_tree(path)
|
||||
|
||||
(fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
|
||||
try:
|
||||
os.write(fd, content)
|
||||
finally:
|
||||
os.close(fd)
|
||||
return path
|
||||
|
@ -1,4 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
@ -30,7 +29,7 @@ class Config(fixtures.Fixture):
|
||||
the specified configuration option group.
|
||||
|
||||
All overrides are automatically cleared at the end of the current
|
||||
test by the reset() method, which is registred by addCleanup().
|
||||
test by the reset() method, which is registered by addCleanup().
|
||||
"""
|
||||
|
||||
def __init__(self, conf=cfg.CONF):
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 Red Hat, Inc.
|
||||
# Copyright 2013 IBM Corp.
|
||||
# All Rights Reserved.
|
||||
@ -317,7 +315,7 @@ def get_available_languages(domain):
|
||||
# NOTE(luisg): Babel <1.0 used a function called list(), which was
|
||||
# renamed to locale_identifiers() in >=1.0, the requirements master list
|
||||
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
|
||||
# this check when the master list updates to >=1.0, and all projects udpate
|
||||
# this check when the master list updates to >=1.0, and update all projects
|
||||
list_identifiers = (getattr(localedata, 'list', None) or
|
||||
getattr(localedata, 'locale_identifiers'))
|
||||
locale_identifiers = list_identifiers()
|
||||
@ -329,13 +327,21 @@ def get_available_languages(domain):
|
||||
|
||||
|
||||
def get_localized_message(message, user_locale):
|
||||
"""Gets a localized version of the given message in the given locale."""
|
||||
"""Gets a localized version of the given message in the given locale.
|
||||
|
||||
If the message is not a Message object the message is returned as-is.
|
||||
If the locale is None the message is translated to the default locale.
|
||||
|
||||
:returns: the translated message in unicode, or the original message if
|
||||
it could not be translated
|
||||
"""
|
||||
translated = message
|
||||
if isinstance(message, Message):
|
||||
if user_locale:
|
||||
message.locale = user_locale
|
||||
return six.text_type(message)
|
||||
else:
|
||||
return message
|
||||
original_locale = message.locale
|
||||
message.locale = user_locale
|
||||
translated = six.text_type(message)
|
||||
message.locale = original_locale
|
||||
return translated
|
||||
|
||||
|
||||
class LocaleHandler(logging.Handler):
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2011 Justin Santa Barbara
|
||||
@ -38,14 +36,19 @@ import functools
|
||||
import inspect
|
||||
import itertools
|
||||
import json
|
||||
import types
|
||||
import xmlrpclib
|
||||
try:
|
||||
import xmlrpclib
|
||||
except ImportError:
|
||||
# NOTE(jd): xmlrpclib is not shipped with Python 3
|
||||
xmlrpclib = None
|
||||
|
||||
import netaddr
|
||||
import six
|
||||
|
||||
from keystone.openstack.common import gettextutils
|
||||
from keystone.openstack.common import importutils
|
||||
from keystone.openstack.common import timeutils
|
||||
|
||||
netaddr = importutils.try_import("netaddr")
|
||||
|
||||
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
|
||||
inspect.isfunction, inspect.isgeneratorfunction,
|
||||
@ -53,7 +56,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
|
||||
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
|
||||
inspect.isabstract]
|
||||
|
||||
_simple_types = (types.NoneType, int, basestring, bool, float, long)
|
||||
_simple_types = (six.string_types + six.integer_types
|
||||
+ (type(None), bool, float))
|
||||
|
||||
|
||||
def to_primitive(value, convert_instances=False, convert_datetime=True,
|
||||
@ -125,11 +129,13 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
||||
# It's not clear why xmlrpclib created their own DateTime type, but
|
||||
# for our purposes, make it a datetime type which is explicitly
|
||||
# handled
|
||||
if isinstance(value, xmlrpclib.DateTime):
|
||||
if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
|
||||
value = datetime.datetime(*tuple(value.timetuple())[:6])
|
||||
|
||||
if convert_datetime and isinstance(value, datetime.datetime):
|
||||
return timeutils.strtime(value)
|
||||
elif isinstance(value, gettextutils.Message):
|
||||
return value.data
|
||||
elif hasattr(value, 'iteritems'):
|
||||
return recursive(dict(value.iteritems()), level=level + 1)
|
||||
elif hasattr(value, '__iter__'):
|
||||
@ -138,7 +144,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
||||
# Likely an instance of something. Watch for cycles.
|
||||
# Ignore class member vars.
|
||||
return recursive(value.__dict__, level=level + 1)
|
||||
elif isinstance(value, netaddr.IPAddress):
|
||||
elif netaddr and isinstance(value, netaddr.IPAddress):
|
||||
return six.text_type(value)
|
||||
else:
|
||||
if any(test(value) for test in _nasty_type_tests):
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
@ -20,6 +18,10 @@ import contextlib
|
||||
import errno
|
||||
import functools
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import weakref
|
||||
@ -39,6 +41,7 @@ util_opts = [
|
||||
cfg.BoolOpt('disable_process_locking', default=False,
|
||||
help='Whether to disable inter-process locks'),
|
||||
cfg.StrOpt('lock_path',
|
||||
default=os.environ.get("KEYSTONE_LOCK_PATH"),
|
||||
help=('Directory to use for lock files.'))
|
||||
]
|
||||
|
||||
@ -131,6 +134,7 @@ else:
|
||||
InterProcessLock = _PosixLock
|
||||
|
||||
_semaphores = weakref.WeakValueDictionary()
|
||||
_semaphores_lock = threading.Lock()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
@ -153,15 +157,12 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
special location for external lock files to live. If nothing is set, then
|
||||
CONF.lock_path is used as a default.
|
||||
"""
|
||||
# NOTE(soren): If we ever go natively threaded, this will be racy.
|
||||
# See http://stackoverflow.com/questions/5390569/dyn
|
||||
# amically-allocating-and-destroying-mutexes
|
||||
sem = _semaphores.get(name, threading.Semaphore())
|
||||
if name not in _semaphores:
|
||||
# this check is not racy - we're already holding ref locally
|
||||
# so GC won't remove the item and there was no IO switch
|
||||
# (only valid in greenthreads)
|
||||
_semaphores[name] = sem
|
||||
with _semaphores_lock:
|
||||
try:
|
||||
sem = _semaphores[name]
|
||||
except KeyError:
|
||||
sem = threading.Semaphore()
|
||||
_semaphores[name] = sem
|
||||
|
||||
with sem:
|
||||
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
|
||||
@ -241,13 +242,14 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
def wrap(f):
|
||||
@functools.wraps(f)
|
||||
def inner(*args, **kwargs):
|
||||
with lock(name, lock_file_prefix, external, lock_path):
|
||||
LOG.debug(_('Got semaphore / lock "%(function)s"'),
|
||||
try:
|
||||
with lock(name, lock_file_prefix, external, lock_path):
|
||||
LOG.debug(_('Got semaphore / lock "%(function)s"'),
|
||||
{'function': f.__name__})
|
||||
return f(*args, **kwargs)
|
||||
finally:
|
||||
LOG.debug(_('Semaphore / lock released "%(function)s"'),
|
||||
{'function': f.__name__})
|
||||
return f(*args, **kwargs)
|
||||
|
||||
LOG.debug(_('Semaphore / lock released "%(function)s"'),
|
||||
{'function': f.__name__})
|
||||
return inner
|
||||
return wrap
|
||||
|
||||
@ -275,3 +277,27 @@ def synchronized_with_prefix(lock_file_prefix):
|
||||
"""
|
||||
|
||||
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
|
||||
|
||||
|
||||
def main(argv):
|
||||
"""Create a dir for locks and pass it to command from arguments
|
||||
|
||||
If you run this:
|
||||
python -m openstack.common.lockutils python setup.py testr <etc>
|
||||
|
||||
a temporary directory will be created for all your locks and passed to all
|
||||
your tests in an environment variable. The temporary dir will be deleted
|
||||
afterwards and the return value will be preserved.
|
||||
"""
|
||||
|
||||
lock_dir = tempfile.mkdtemp()
|
||||
os.environ["KEYSTONE_LOCK_PATH"] = lock_dir
|
||||
try:
|
||||
ret_val = subprocess.call(argv[1:])
|
||||
finally:
|
||||
shutil.rmtree(lock_dir, ignore_errors=True)
|
||||
return ret_val
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main(sys.argv))
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
@ -51,7 +49,7 @@ from keystone.openstack.common import local
|
||||
|
||||
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password']
|
||||
_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
|
||||
|
||||
# NOTE(ldbragst): Let's build a list of regex objects using the list of
|
||||
# _SANITIZE_KEYS we already have. This way, we only have to add the new key
|
||||
@ -132,7 +130,7 @@ generic_log_opts = [
|
||||
log_opts = [
|
||||
cfg.StrOpt('logging_context_format_string',
|
||||
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
|
||||
'%(name)s [%(request_id)s %(user)s %(tenant)s] '
|
||||
'%(name)s [%(request_id)s %(user_identity)s] '
|
||||
'%(instance)s%(message)s',
|
||||
help='format string to use for log messages with context'),
|
||||
cfg.StrOpt('logging_default_format_string',
|
||||
@ -155,6 +153,7 @@ log_opts = [
|
||||
'qpid=WARN',
|
||||
'sqlalchemy=WARN',
|
||||
'suds=INFO',
|
||||
'iso8601=WARN',
|
||||
],
|
||||
help='list of logger=LEVEL pairs'),
|
||||
cfg.BoolOpt('publish_errors',
|
||||
@ -333,10 +332,12 @@ class ContextAdapter(BaseLoggerAdapter):
|
||||
elif instance_uuid:
|
||||
instance_extra = (CONF.instance_uuid_format
|
||||
% {'uuid': instance_uuid})
|
||||
extra.update({'instance': instance_extra})
|
||||
extra['instance'] = instance_extra
|
||||
|
||||
extra.update({"project": self.project})
|
||||
extra.update({"version": self.version})
|
||||
extra.setdefault('user_identity', kwargs.pop('user_identity', None))
|
||||
|
||||
extra['project'] = self.project
|
||||
extra['version'] = self.version
|
||||
extra['extra'] = extra.copy()
|
||||
return msg, kwargs
|
||||
|
||||
@ -350,7 +351,7 @@ class JSONFormatter(logging.Formatter):
|
||||
def formatException(self, ei, strip_newlines=True):
|
||||
lines = traceback.format_exception(*ei)
|
||||
if strip_newlines:
|
||||
lines = [itertools.ifilter(
|
||||
lines = [moves.filter(
|
||||
lambda x: x,
|
||||
line.rstrip().splitlines()) for line in lines]
|
||||
lines = list(itertools.chain(*lines))
|
||||
@ -388,10 +389,10 @@ class JSONFormatter(logging.Formatter):
|
||||
|
||||
|
||||
def _create_logging_excepthook(product_name):
|
||||
def logging_excepthook(type, value, tb):
|
||||
def logging_excepthook(exc_type, value, tb):
|
||||
extra = {}
|
||||
if CONF.verbose:
|
||||
extra['exc_info'] = (type, value, tb)
|
||||
extra['exc_info'] = (exc_type, value, tb)
|
||||
getLogger(product_name).critical(str(value), **extra)
|
||||
return logging_excepthook
|
||||
|
||||
@ -476,7 +477,7 @@ def _setup_logging_from_conf():
|
||||
streamlog = ColorHandler()
|
||||
log_root.addHandler(streamlog)
|
||||
|
||||
elif not CONF.log_file:
|
||||
elif not logpath:
|
||||
# pass sys.stdout as a positional argument
|
||||
# python2.6 calls the argument strm, in 2.7 it's stream
|
||||
streamlog = logging.StreamHandler(sys.stdout)
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2011 Justin Santa Barbara
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
@ -19,7 +17,7 @@
|
||||
Network-related utilities and helper functions.
|
||||
"""
|
||||
|
||||
import urlparse
|
||||
from keystone.openstack.common.py3kcompat import urlutils
|
||||
|
||||
|
||||
def parse_host_port(address, default_port=None):
|
||||
@ -72,10 +70,10 @@ def urlsplit(url, scheme='', allow_fragments=True):
|
||||
|
||||
The parameters are the same as urlparse.urlsplit.
|
||||
"""
|
||||
scheme, netloc, path, query, fragment = urlparse.urlsplit(
|
||||
scheme, netloc, path, query, fragment = urlutils.urlsplit(
|
||||
url, scheme, allow_fragments)
|
||||
if allow_fragments and '#' in path:
|
||||
path, fragment = path.split('#', 1)
|
||||
if '?' in path:
|
||||
path, query = path.split('?', 1)
|
||||
return urlparse.SplitResult(scheme, netloc, path, query, fragment)
|
||||
return urlutils.SplitResult(scheme, netloc, path, query, fragment)
|
||||
|
@ -43,4 +43,5 @@ def notify(context, message):
|
||||
rpc.notify(context, topic, message)
|
||||
except Exception:
|
||||
LOG.exception(_("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s"), locals())
|
||||
"Payload=%(message)s"),
|
||||
{"topic": topic, "message": message})
|
||||
|
@ -49,4 +49,5 @@ def notify(context, message):
|
||||
rpc.notify(context, topic, message, envelope=True)
|
||||
except Exception:
|
||||
LOG.exception(_("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s"), locals())
|
||||
"Payload=%(message)s"),
|
||||
{"topic": topic, "message": message})
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2012 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
@ -221,7 +219,7 @@ class Enforcer(object):
|
||||
if policy_file:
|
||||
return policy_file
|
||||
|
||||
raise cfg.ConfigFilesNotFoundError((CONF.policy_file,))
|
||||
raise cfg.ConfigFilesNotFoundError((self.policy_file,))
|
||||
|
||||
def enforce(self, rule, target, creds, do_raise=False,
|
||||
exc=None, *args, **kwargs):
|
||||
@ -279,11 +277,10 @@ class Enforcer(object):
|
||||
return result
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseCheck(object):
|
||||
"""Abstract base class for Check classes."""
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def __str__(self):
|
||||
"""String representation of the Check tree rooted at this node."""
|
||||
@ -506,7 +503,7 @@ def _parse_list_rule(rule):
|
||||
continue
|
||||
|
||||
# Handle bare strings
|
||||
if isinstance(inner_rule, basestring):
|
||||
if isinstance(inner_rule, six.string_types):
|
||||
inner_rule = [inner_rule]
|
||||
|
||||
# Parse the inner rules into Check objects
|
||||
@ -626,6 +623,7 @@ def reducer(*tokens):
|
||||
return decorator
|
||||
|
||||
|
||||
@six.add_metaclass(ParseStateMeta)
|
||||
class ParseState(object):
|
||||
"""Implement the core of parsing the policy language.
|
||||
|
||||
@ -638,8 +636,6 @@ class ParseState(object):
|
||||
shouldn't be that big a problem.
|
||||
"""
|
||||
|
||||
__metaclass__ = ParseStateMeta
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the ParseState."""
|
||||
|
||||
@ -765,7 +761,7 @@ def parse_rule(rule):
|
||||
"""Parses a policy rule into a tree of Check objects."""
|
||||
|
||||
# If the rule is a string, it's in the policy language
|
||||
if isinstance(rule, basestring):
|
||||
if isinstance(rule, six.string_types):
|
||||
return _parse_text_rule(rule)
|
||||
return _parse_list_rule(rule)
|
||||
|
||||
|
248
keystone/openstack/common/processutils.py
Normal file
248
keystone/openstack/common/processutils.py
Normal file
@ -0,0 +1,248 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
System-level utilities and helper functions.
|
||||
"""
|
||||
|
||||
import logging as stdlib_logging
|
||||
import os
|
||||
import random
|
||||
import shlex
|
||||
import signal
|
||||
|
||||
from eventlet.green import subprocess
|
||||
from eventlet import greenthread
|
||||
|
||||
from keystone.openstack.common.gettextutils import _ # noqa
|
||||
from keystone.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InvalidArgumentError(Exception):
|
||||
def __init__(self, message=None):
|
||||
super(InvalidArgumentError, self).__init__(message)
|
||||
|
||||
|
||||
class UnknownArgumentError(Exception):
|
||||
def __init__(self, message=None):
|
||||
super(UnknownArgumentError, self).__init__(message)
|
||||
|
||||
|
||||
class ProcessExecutionError(Exception):
|
||||
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
|
||||
description=None):
|
||||
self.exit_code = exit_code
|
||||
self.stderr = stderr
|
||||
self.stdout = stdout
|
||||
self.cmd = cmd
|
||||
self.description = description
|
||||
|
||||
if description is None:
|
||||
description = "Unexpected error while running command."
|
||||
if exit_code is None:
|
||||
exit_code = '-'
|
||||
message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r"
|
||||
% (description, cmd, exit_code, stdout, stderr))
|
||||
super(ProcessExecutionError, self).__init__(message)
|
||||
|
||||
|
||||
class NoRootWrapSpecified(Exception):
|
||||
def __init__(self, message=None):
|
||||
super(NoRootWrapSpecified, self).__init__(message)
|
||||
|
||||
|
||||
def _subprocess_setup():
|
||||
# Python installs a SIGPIPE handler by default. This is usually not what
|
||||
# non-Python subprocesses expect.
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
|
||||
|
||||
def execute(*cmd, **kwargs):
|
||||
"""Helper method to shell out and execute a command through subprocess.
|
||||
|
||||
Allows optional retry.
|
||||
|
||||
:param cmd: Passed to subprocess.Popen.
|
||||
:type cmd: string
|
||||
:param process_input: Send to opened process.
|
||||
:type proces_input: string
|
||||
:param check_exit_code: Single bool, int, or list of allowed exit
|
||||
codes. Defaults to [0]. Raise
|
||||
:class:`ProcessExecutionError` unless
|
||||
program exits with one of these code.
|
||||
:type check_exit_code: boolean, int, or [int]
|
||||
:param delay_on_retry: True | False. Defaults to True. If set to True,
|
||||
wait a short amount of time before retrying.
|
||||
:type delay_on_retry: boolean
|
||||
:param attempts: How many times to retry cmd.
|
||||
:type attempts: int
|
||||
:param run_as_root: True | False. Defaults to False. If set to True,
|
||||
the command is prefixed by the command specified
|
||||
in the root_helper kwarg.
|
||||
:type run_as_root: boolean
|
||||
:param root_helper: command to prefix to commands called with
|
||||
run_as_root=True
|
||||
:type root_helper: string
|
||||
:param shell: whether or not there should be a shell used to
|
||||
execute this command. Defaults to false.
|
||||
:type shell: boolean
|
||||
:param loglevel: log level for execute commands.
|
||||
:type loglevel: int. (Should be stdlib_logging.DEBUG or
|
||||
stdlib_logging.INFO)
|
||||
:returns: (stdout, stderr) from process execution
|
||||
:raises: :class:`UnknownArgumentError` on
|
||||
receiving unknown arguments
|
||||
:raises: :class:`ProcessExecutionError`
|
||||
"""
|
||||
|
||||
process_input = kwargs.pop('process_input', None)
|
||||
check_exit_code = kwargs.pop('check_exit_code', [0])
|
||||
ignore_exit_code = False
|
||||
delay_on_retry = kwargs.pop('delay_on_retry', True)
|
||||
attempts = kwargs.pop('attempts', 1)
|
||||
run_as_root = kwargs.pop('run_as_root', False)
|
||||
root_helper = kwargs.pop('root_helper', '')
|
||||
shell = kwargs.pop('shell', False)
|
||||
loglevel = kwargs.pop('loglevel', stdlib_logging.DEBUG)
|
||||
|
||||
if isinstance(check_exit_code, bool):
|
||||
ignore_exit_code = not check_exit_code
|
||||
check_exit_code = [0]
|
||||
elif isinstance(check_exit_code, int):
|
||||
check_exit_code = [check_exit_code]
|
||||
|
||||
if kwargs:
|
||||
raise UnknownArgumentError(_('Got unknown keyword args '
|
||||
'to utils.execute: %r') % kwargs)
|
||||
|
||||
if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0:
|
||||
if not root_helper:
|
||||
raise NoRootWrapSpecified(
|
||||
message=('Command requested root, but did not specify a root '
|
||||
'helper.'))
|
||||
cmd = shlex.split(root_helper) + list(cmd)
|
||||
|
||||
cmd = map(str, cmd)
|
||||
|
||||
while attempts > 0:
|
||||
attempts -= 1
|
||||
try:
|
||||
LOG.log(loglevel, _('Running cmd (subprocess): %s'), ' '.join(cmd))
|
||||
_PIPE = subprocess.PIPE # pylint: disable=E1101
|
||||
|
||||
if os.name == 'nt':
|
||||
preexec_fn = None
|
||||
close_fds = False
|
||||
else:
|
||||
preexec_fn = _subprocess_setup
|
||||
close_fds = True
|
||||
|
||||
obj = subprocess.Popen(cmd,
|
||||
stdin=_PIPE,
|
||||
stdout=_PIPE,
|
||||
stderr=_PIPE,
|
||||
close_fds=close_fds,
|
||||
preexec_fn=preexec_fn,
|
||||
shell=shell)
|
||||
result = None
|
||||
if process_input is not None:
|
||||
result = obj.communicate(process_input)
|
||||
else:
|
||||
result = obj.communicate()
|
||||
obj.stdin.close() # pylint: disable=E1101
|
||||
_returncode = obj.returncode # pylint: disable=E1101
|
||||
LOG.log(loglevel, _('Result was %s') % _returncode)
|
||||
if not ignore_exit_code and _returncode not in check_exit_code:
|
||||
(stdout, stderr) = result
|
||||
raise ProcessExecutionError(exit_code=_returncode,
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
cmd=' '.join(cmd))
|
||||
return result
|
||||
except ProcessExecutionError:
|
||||
if not attempts:
|
||||
raise
|
||||
else:
|
||||
LOG.log(loglevel, _('%r failed. Retrying.'), cmd)
|
||||
if delay_on_retry:
|
||||
greenthread.sleep(random.randint(20, 200) / 100.0)
|
||||
finally:
|
||||
# NOTE(termie): this appears to be necessary to let the subprocess
|
||||
# call clean something up in between calls, without
|
||||
# it two execute calls in a row hangs the second one
|
||||
greenthread.sleep(0)
|
||||
|
||||
|
||||
def trycmd(*args, **kwargs):
|
||||
"""A wrapper around execute() to more easily handle warnings and errors.
|
||||
|
||||
Returns an (out, err) tuple of strings containing the output of
|
||||
the command's stdout and stderr. If 'err' is not empty then the
|
||||
command can be considered to have failed.
|
||||
|
||||
:discard_warnings True | False. Defaults to False. If set to True,
|
||||
then for succeeding commands, stderr is cleared
|
||||
|
||||
"""
|
||||
discard_warnings = kwargs.pop('discard_warnings', False)
|
||||
|
||||
try:
|
||||
out, err = execute(*args, **kwargs)
|
||||
failed = False
|
||||
except ProcessExecutionError as exn:
|
||||
out, err = '', str(exn)
|
||||
failed = True
|
||||
|
||||
if not failed and discard_warnings and err:
|
||||
# Handle commands that output to stderr but otherwise succeed
|
||||
err = ''
|
||||
|
||||
return out, err
|
||||
|
||||
|
||||
def ssh_execute(ssh, cmd, process_input=None,
|
||||
addl_env=None, check_exit_code=True):
|
||||
LOG.debug(_('Running cmd (SSH): %s'), cmd)
|
||||
if addl_env:
|
||||
raise InvalidArgumentError(_('Environment not supported over SSH'))
|
||||
|
||||
if process_input:
|
||||
# This is (probably) fixable if we need it...
|
||||
raise InvalidArgumentError(_('process_input not supported over SSH'))
|
||||
|
||||
stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
|
||||
channel = stdout_stream.channel
|
||||
|
||||
# NOTE(justinsb): This seems suspicious...
|
||||
# ...other SSH clients have buffering issues with this approach
|
||||
stdout = stdout_stream.read()
|
||||
stderr = stderr_stream.read()
|
||||
stdin_stream.close()
|
||||
|
||||
exit_status = channel.recv_exit_status()
|
||||
|
||||
# exit_status == -1 if no exit code was returned
|
||||
if exit_status != -1:
|
||||
LOG.debug(_('Result was %s') % exit_status)
|
||||
if check_exit_code and exit_status != 0:
|
||||
raise ProcessExecutionError(exit_code=exit_status,
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
cmd=cmd)
|
||||
|
||||
return (stdout, stderr)
|
16
keystone/openstack/common/py3kcompat/__init__.py
Normal file
16
keystone/openstack/common/py3kcompat/__init__.py
Normal file
@ -0,0 +1,16 @@
|
||||
#
|
||||
# Copyright 2013 Canonical Ltd.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
63
keystone/openstack/common/py3kcompat/urlutils.py
Normal file
63
keystone/openstack/common/py3kcompat/urlutils.py
Normal file
@ -0,0 +1,63 @@
|
||||
#
|
||||
# Copyright 2013 Canonical Ltd.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""
|
||||
Python2/Python3 compatibility layer for OpenStack
|
||||
"""
|
||||
|
||||
import six
|
||||
|
||||
if six.PY3:
|
||||
# python3
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
||||
urlencode = urllib.parse.urlencode
|
||||
urljoin = urllib.parse.urljoin
|
||||
quote = urllib.parse.quote
|
||||
parse_qsl = urllib.parse.parse_qsl
|
||||
unquote = urllib.parse.unquote
|
||||
urlparse = urllib.parse.urlparse
|
||||
urlsplit = urllib.parse.urlsplit
|
||||
urlunsplit = urllib.parse.urlunsplit
|
||||
SplitResult = urllib.parse.SplitResult
|
||||
|
||||
urlopen = urllib.request.urlopen
|
||||
URLError = urllib.error.URLError
|
||||
pathname2url = urllib.request.pathname2url
|
||||
else:
|
||||
# python2
|
||||
import urllib
|
||||
import urllib2
|
||||
import urlparse
|
||||
|
||||
urlencode = urllib.urlencode
|
||||
quote = urllib.quote
|
||||
unquote = urllib.unquote
|
||||
|
||||
parse = urlparse
|
||||
parse_qsl = parse.parse_qsl
|
||||
urljoin = parse.urljoin
|
||||
urlparse = parse.urlparse
|
||||
urlsplit = parse.urlsplit
|
||||
urlunsplit = parse.urlunsplit
|
||||
SplitResult = parse.SplitResult
|
||||
|
||||
urlopen = urllib2.urlopen
|
||||
URLError = urllib2.URLError
|
||||
pathname2url = urllib.pathname2url
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -56,13 +54,12 @@ rpc_opts = [
|
||||
help='Seconds to wait before a cast expires (TTL). '
|
||||
'Only supported by impl_zmq.'),
|
||||
cfg.ListOpt('allowed_rpc_exception_modules',
|
||||
default=['keystone.openstack.common.exception',
|
||||
'nova.exception',
|
||||
default=['nova.exception',
|
||||
'cinder.exception',
|
||||
'exceptions',
|
||||
],
|
||||
help='Modules of exceptions that are permitted to be recreated'
|
||||
'upon receiving exception data from an rpc call.'),
|
||||
' upon receiving exception data from an rpc call.'),
|
||||
cfg.BoolOpt('fake_rabbit',
|
||||
default=False,
|
||||
help='If passed, use a fake RabbitMQ provider'),
|
||||
@ -228,7 +225,7 @@ def notify(context, topic, msg, envelope=False):
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Clean up resoruces in use by implementation.
|
||||
"""Clean up resources in use by implementation.
|
||||
|
||||
Clean up any resources that have been allocated by the RPC implementation.
|
||||
This is typically open connections to a messaging service. This function
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -20,9 +18,9 @@
|
||||
"""
|
||||
Shared code between AMQP based openstack.common.rpc implementations.
|
||||
|
||||
The code in this module is shared between the rpc implemenations based on AMQP.
|
||||
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
|
||||
AMQP, but is deprecated and predates this code.
|
||||
The code in this module is shared between the rpc implementations based on
|
||||
AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
|
||||
uses AMQP, but is deprecated and predates this code.
|
||||
"""
|
||||
|
||||
import collections
|
||||
@ -189,7 +187,7 @@ class ReplyProxy(ConnectionContext):
|
||||
def __init__(self, conf, connection_pool):
|
||||
self._call_waiters = {}
|
||||
self._num_call_waiters = 0
|
||||
self._num_call_waiters_wrn_threshhold = 10
|
||||
self._num_call_waiters_wrn_threshold = 10
|
||||
self._reply_q = 'reply_' + uuid.uuid4().hex
|
||||
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
|
||||
self.declare_direct_consumer(self._reply_q, self._process_data)
|
||||
@ -208,11 +206,11 @@ class ReplyProxy(ConnectionContext):
|
||||
|
||||
def add_call_waiter(self, waiter, msg_id):
|
||||
self._num_call_waiters += 1
|
||||
if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
|
||||
if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
|
||||
LOG.warn(_('Number of call waiters is greater than warning '
|
||||
'threshhold: %d. There could be a MulticallProxyWaiter '
|
||||
'leak.') % self._num_call_waiters_wrn_threshhold)
|
||||
self._num_call_waiters_wrn_threshhold *= 2
|
||||
'threshold: %d. There could be a MulticallProxyWaiter '
|
||||
'leak.') % self._num_call_waiters_wrn_threshold)
|
||||
self._num_call_waiters_wrn_threshold *= 2
|
||||
self._call_waiters[msg_id] = waiter
|
||||
|
||||
def del_call_waiter(self, msg_id):
|
||||
@ -241,7 +239,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
|
||||
_add_unique_id(msg)
|
||||
# If a reply_q exists, add the msg_id to the reply and pass the
|
||||
# reply_q to direct_send() to use it as the response queue.
|
||||
# Otherwise use the msg_id for backward compatibilty.
|
||||
# Otherwise use the msg_id for backward compatibility.
|
||||
if reply_q:
|
||||
msg['_msg_id'] = msg_id
|
||||
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
|
||||
@ -364,22 +362,43 @@ class CallbackWrapper(_ThreadPoolWithWait):
|
||||
Allows it to be invoked in a green thread.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, callback, connection_pool):
|
||||
def __init__(self, conf, callback, connection_pool,
|
||||
wait_for_consumers=False):
|
||||
"""Initiates CallbackWrapper object.
|
||||
|
||||
:param conf: cfg.CONF instance
|
||||
:param callback: a callable (probably a function)
|
||||
:param connection_pool: connection pool as returned by
|
||||
get_connection_pool()
|
||||
:param wait_for_consumers: wait for all green threads to
|
||||
complete and raise the last
|
||||
caught exception, if any.
|
||||
|
||||
"""
|
||||
super(CallbackWrapper, self).__init__(
|
||||
conf=conf,
|
||||
connection_pool=connection_pool,
|
||||
)
|
||||
self.callback = callback
|
||||
self.wait_for_consumers = wait_for_consumers
|
||||
self.exc_info = None
|
||||
|
||||
def _wrap(self, message_data, **kwargs):
|
||||
"""Wrap the callback invocation to catch exceptions.
|
||||
"""
|
||||
try:
|
||||
self.callback(message_data, **kwargs)
|
||||
except Exception:
|
||||
self.exc_info = sys.exc_info()
|
||||
|
||||
def __call__(self, message_data):
|
||||
self.pool.spawn_n(self.callback, message_data)
|
||||
self.exc_info = None
|
||||
self.pool.spawn_n(self._wrap, message_data)
|
||||
|
||||
if self.wait_for_consumers:
|
||||
self.pool.waitall()
|
||||
if self.exc_info:
|
||||
raise self.exc_info[1], None, self.exc_info[2]
|
||||
|
||||
|
||||
class ProxyCallback(_ThreadPoolWithWait):
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -29,6 +27,7 @@ from keystone.openstack.common import importutils
|
||||
from keystone.openstack.common import jsonutils
|
||||
from keystone.openstack.common import local
|
||||
from keystone.openstack.common import log as logging
|
||||
from keystone.openstack.common import versionutils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -265,7 +264,7 @@ def _safe_log(log_func, msg, msg_data):
|
||||
|
||||
def _fix_passwords(d):
|
||||
"""Sanitizes the password fields in the dictionary."""
|
||||
for k in d.iterkeys():
|
||||
for k in six.iterkeys(d):
|
||||
if k.lower().find('password') != -1:
|
||||
d[k] = '<SANITIZED>'
|
||||
elif k.lower() in SANITIZE:
|
||||
@ -441,19 +440,15 @@ def client_exceptions(*exceptions):
|
||||
return outer
|
||||
|
||||
|
||||
# TODO(sirp): we should deprecate this in favor of
|
||||
# using `versionutils.is_compatible` directly
|
||||
def version_is_compatible(imp_version, version):
|
||||
"""Determine whether versions are compatible.
|
||||
|
||||
:param imp_version: The version implemented
|
||||
:param version: The version requested by an incoming message.
|
||||
"""
|
||||
version_parts = version.split('.')
|
||||
imp_version_parts = imp_version.split('.')
|
||||
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
|
||||
return False
|
||||
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
|
||||
return False
|
||||
return True
|
||||
return versionutils.is_compatible(version, imp_version)
|
||||
|
||||
|
||||
def serialize_msg(raw_msg):
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -146,7 +144,7 @@ def multicall(conf, context, topic, msg, timeout=None):
|
||||
try:
|
||||
consumer = CONSUMERS[topic][0]
|
||||
except (KeyError, IndexError):
|
||||
return iter([None])
|
||||
raise rpc_common.Timeout("No consumers available")
|
||||
else:
|
||||
return consumer.call(context, version, method, namespace, args,
|
||||
timeout)
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -28,6 +26,7 @@ import kombu.connection
|
||||
import kombu.entity
|
||||
import kombu.messaging
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from keystone.openstack.common import excutils
|
||||
from keystone.openstack.common.gettextutils import _ # noqa
|
||||
@ -146,29 +145,23 @@ class ConsumerBase(object):
|
||||
Messages that are processed without exception are ack'ed.
|
||||
|
||||
If the message processing generates an exception, it will be
|
||||
ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
|
||||
Rejection is better than waiting for the message to timeout.
|
||||
Rejected messages are immediately requeued.
|
||||
ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
|
||||
"""
|
||||
|
||||
ack_msg = False
|
||||
try:
|
||||
msg = rpc_common.deserialize_msg(message.payload)
|
||||
callback(msg)
|
||||
ack_msg = True
|
||||
except Exception:
|
||||
if self.ack_on_error:
|
||||
ack_msg = True
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... skipping it."))
|
||||
message.ack()
|
||||
else:
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... will requeue."))
|
||||
finally:
|
||||
if ack_msg:
|
||||
message.ack()
|
||||
else:
|
||||
message.reject()
|
||||
message.requeue()
|
||||
else:
|
||||
message.ack()
|
||||
|
||||
def consume(self, *args, **kwargs):
|
||||
"""Actually declare the consumer on the amqp channel. This will
|
||||
@ -631,7 +624,7 @@ class Connection(object):
|
||||
|
||||
def _declare_consumer():
|
||||
consumer = consumer_cls(self.conf, self.channel, topic, callback,
|
||||
self.consumer_num.next())
|
||||
six.next(self.consumer_num))
|
||||
self.consumers.append(consumer)
|
||||
return consumer
|
||||
|
||||
@ -738,7 +731,7 @@ class Connection(object):
|
||||
it = self.iterconsume(limit=limit)
|
||||
while True:
|
||||
try:
|
||||
it.next()
|
||||
six.next(it)
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
@ -789,6 +782,7 @@ class Connection(object):
|
||||
callback=callback,
|
||||
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||
Connection),
|
||||
wait_for_consumers=not ack_on_error
|
||||
)
|
||||
self.proxy_callbacks.append(callback_wrapper)
|
||||
self.declare_topic_consumer(
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
# Copyright 2011 - 2012, Red Hat, Inc.
|
||||
#
|
||||
@ -18,11 +16,11 @@
|
||||
import functools
|
||||
import itertools
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import eventlet
|
||||
import greenlet
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from keystone.openstack.common import excutils
|
||||
from keystone.openstack.common.gettextutils import _ # noqa
|
||||
@ -67,6 +65,17 @@ qpid_opts = [
|
||||
cfg.BoolOpt('qpid_tcp_nodelay',
|
||||
default=True,
|
||||
help='Disable Nagle algorithm'),
|
||||
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
|
||||
# this file could probably use some additional refactoring so that the
|
||||
# differences between each version are split into different classes.
|
||||
cfg.IntOpt('qpid_topology_version',
|
||||
default=1,
|
||||
help="The qpid topology version to use. Version 1 is what "
|
||||
"was originally used by impl_qpid. Version 2 includes "
|
||||
"some backwards-incompatible changes that allow broker "
|
||||
"federation to work. Users should update to version 2 "
|
||||
"when they are able to take everything down, as it "
|
||||
"requires a clean break."),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(qpid_opts)
|
||||
@ -74,10 +83,17 @@ cfg.CONF.register_opts(qpid_opts)
|
||||
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
|
||||
|
||||
|
||||
def raise_invalid_topology_version(conf):
|
||||
msg = (_("Invalid value for qpid_topology_version: %d") %
|
||||
conf.qpid_topology_version)
|
||||
LOG.error(msg)
|
||||
raise Exception(msg)
|
||||
|
||||
|
||||
class ConsumerBase(object):
|
||||
"""Consumer base class."""
|
||||
|
||||
def __init__(self, session, callback, node_name, node_opts,
|
||||
def __init__(self, conf, session, callback, node_name, node_opts,
|
||||
link_name, link_opts):
|
||||
"""Declare a queue on an amqp session.
|
||||
|
||||
@ -95,26 +111,39 @@ class ConsumerBase(object):
|
||||
self.receiver = None
|
||||
self.session = None
|
||||
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
"type": "topic",
|
||||
"x-declare": {
|
||||
if conf.qpid_topology_version == 1:
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
"type": "topic",
|
||||
"x-declare": {
|
||||
"durable": True,
|
||||
"auto-delete": True,
|
||||
},
|
||||
},
|
||||
"link": {
|
||||
"durable": True,
|
||||
"auto-delete": True,
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
"auto-delete": True,
|
||||
"exclusive": False,
|
||||
},
|
||||
},
|
||||
},
|
||||
"link": {
|
||||
"name": link_name,
|
||||
"durable": True,
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
"auto-delete": True,
|
||||
"exclusive": False,
|
||||
}
|
||||
if link_name:
|
||||
addr_opts["link"]["name"] = link_name
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
addr_opts = {
|
||||
"link": {
|
||||
"x-declare": {
|
||||
"auto-delete": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
}
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
addr_opts["link"]["x-declare"].update(link_opts)
|
||||
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
@ -122,7 +151,7 @@ class ConsumerBase(object):
|
||||
self.connect(session)
|
||||
|
||||
def connect(self, session):
|
||||
"""Declare the reciever on connect."""
|
||||
"""Declare the receiver on connect."""
|
||||
self._declare_receiver(session)
|
||||
|
||||
def reconnect(self, session):
|
||||
@ -181,16 +210,24 @@ class DirectConsumer(ConsumerBase):
|
||||
'callback' is the callback to call when messages are received
|
||||
"""
|
||||
|
||||
super(DirectConsumer, self).__init__(
|
||||
session, callback,
|
||||
"%s/%s" % (msg_id, msg_id),
|
||||
{"type": "direct"},
|
||||
msg_id,
|
||||
{
|
||||
"auto-delete": conf.amqp_auto_delete,
|
||||
"exclusive": True,
|
||||
"durable": conf.amqp_durable_queues,
|
||||
})
|
||||
link_opts = {
|
||||
"auto-delete": conf.amqp_auto_delete,
|
||||
"exclusive": True,
|
||||
"durable": conf.amqp_durable_queues,
|
||||
}
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s/%s" % (msg_id, msg_id)
|
||||
node_opts = {"type": "direct"}
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.direct/%s" % msg_id
|
||||
node_opts = {}
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(DirectConsumer, self).__init__(conf, session, callback,
|
||||
node_name, node_opts, msg_id,
|
||||
link_opts)
|
||||
|
||||
|
||||
class TopicConsumer(ConsumerBase):
|
||||
@ -208,14 +245,20 @@ class TopicConsumer(ConsumerBase):
|
||||
"""
|
||||
|
||||
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
|
||||
super(TopicConsumer, self).__init__(
|
||||
session, callback,
|
||||
"%s/%s" % (exchange_name, topic),
|
||||
{}, name or topic,
|
||||
{
|
||||
"auto-delete": conf.amqp_auto_delete,
|
||||
"durable": conf.amqp_durable_queues,
|
||||
})
|
||||
link_opts = {
|
||||
"auto-delete": conf.amqp_auto_delete,
|
||||
"durable": conf.amqp_durable_queues,
|
||||
}
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s/%s" % (exchange_name, topic)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(TopicConsumer, self).__init__(conf, session, callback, node_name,
|
||||
{}, name or topic, link_opts)
|
||||
|
||||
|
||||
class FanoutConsumer(ConsumerBase):
|
||||
@ -230,52 +273,53 @@ class FanoutConsumer(ConsumerBase):
|
||||
"""
|
||||
self.conf = conf
|
||||
|
||||
super(FanoutConsumer, self).__init__(
|
||||
session, callback,
|
||||
"%s_fanout" % topic,
|
||||
{"durable": False, "type": "fanout"},
|
||||
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
|
||||
{"exclusive": True})
|
||||
link_opts = {"exclusive": True}
|
||||
|
||||
def reconnect(self, session):
|
||||
topic = self.get_node_name().rpartition('_fanout')[0]
|
||||
params = {
|
||||
'session': session,
|
||||
'topic': topic,
|
||||
'callback': self.callback,
|
||||
}
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s_fanout" % topic
|
||||
node_opts = {"durable": False, "type": "fanout"}
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/fanout/%s" % topic
|
||||
node_opts = {}
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
self.__init__(conf=self.conf, **params)
|
||||
|
||||
super(FanoutConsumer, self).reconnect(session)
|
||||
super(FanoutConsumer, self).__init__(conf, session, callback,
|
||||
node_name, node_opts, None,
|
||||
link_opts)
|
||||
|
||||
|
||||
class Publisher(object):
|
||||
"""Base Publisher class."""
|
||||
|
||||
def __init__(self, session, node_name, node_opts=None):
|
||||
def __init__(self, conf, session, node_name, node_opts=None):
|
||||
"""Init the Publisher class with the exchange_name, routing_key,
|
||||
and other options
|
||||
"""
|
||||
self.sender = None
|
||||
self.session = session
|
||||
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
"type": "topic",
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
# auto-delete isn't implemented for exchanges in qpid,
|
||||
# but put in here anyway
|
||||
"auto-delete": True,
|
||||
if conf.qpid_topology_version == 1:
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
"type": "topic",
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
# auto-delete isn't implemented for exchanges in qpid,
|
||||
# but put in here anyway
|
||||
"auto-delete": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if node_opts:
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
}
|
||||
if node_opts:
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
elif conf.qpid_topology_version == 2:
|
||||
self.address = node_name
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
self.reconnect(session)
|
||||
|
||||
@ -319,39 +363,73 @@ class DirectPublisher(Publisher):
|
||||
"""Publisher class for 'direct'."""
|
||||
def __init__(self, conf, session, msg_id):
|
||||
"""Init a 'direct' publisher."""
|
||||
super(DirectPublisher, self).__init__(session, msg_id,
|
||||
{"type": "direct"})
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = msg_id
|
||||
node_opts = {"type": "direct"}
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.direct/%s" % msg_id
|
||||
node_opts = {}
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(DirectPublisher, self).__init__(conf, session, node_name,
|
||||
node_opts)
|
||||
|
||||
|
||||
class TopicPublisher(Publisher):
|
||||
"""Publisher class for 'topic'."""
|
||||
def __init__(self, conf, session, topic):
|
||||
"""init a 'topic' publisher.
|
||||
"""Init a 'topic' publisher.
|
||||
"""
|
||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||
super(TopicPublisher, self).__init__(session,
|
||||
"%s/%s" % (exchange_name, topic))
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s/%s" % (exchange_name, topic)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(TopicPublisher, self).__init__(conf, session, node_name)
|
||||
|
||||
|
||||
class FanoutPublisher(Publisher):
|
||||
"""Publisher class for 'fanout'."""
|
||||
def __init__(self, conf, session, topic):
|
||||
"""init a 'fanout' publisher.
|
||||
"""Init a 'fanout' publisher.
|
||||
"""
|
||||
super(FanoutPublisher, self).__init__(
|
||||
session,
|
||||
"%s_fanout" % topic, {"type": "fanout"})
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s_fanout" % topic
|
||||
node_opts = {"type": "fanout"}
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/fanout/%s" % topic
|
||||
node_opts = {}
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(FanoutPublisher, self).__init__(conf, session, node_name,
|
||||
node_opts)
|
||||
|
||||
|
||||
class NotifyPublisher(Publisher):
|
||||
"""Publisher class for notifications."""
|
||||
def __init__(self, conf, session, topic):
|
||||
"""init a 'topic' publisher.
|
||||
"""Init a 'topic' publisher.
|
||||
"""
|
||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||
super(NotifyPublisher, self).__init__(session,
|
||||
"%s/%s" % (exchange_name, topic),
|
||||
{"durable": True})
|
||||
node_opts = {"durable": True}
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s/%s" % (exchange_name, topic)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(NotifyPublisher, self).__init__(conf, session, node_name,
|
||||
node_opts)
|
||||
|
||||
|
||||
class Connection(object):
|
||||
@ -446,7 +524,7 @@ class Connection(object):
|
||||
consumers = self.consumers
|
||||
self.consumers = {}
|
||||
|
||||
for consumer in consumers.itervalues():
|
||||
for consumer in six.itervalues(consumers):
|
||||
consumer.reconnect(self.session)
|
||||
self._register_consumer(consumer)
|
||||
|
||||
@ -604,7 +682,7 @@ class Connection(object):
|
||||
it = self.iterconsume(limit=limit)
|
||||
while True:
|
||||
try:
|
||||
it.next()
|
||||
six.next(it)
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
@ -665,6 +743,7 @@ class Connection(object):
|
||||
callback=callback,
|
||||
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||
Connection),
|
||||
wait_for_consumers=not ack_on_error
|
||||
)
|
||||
self.proxy_callbacks.append(callback_wrapper)
|
||||
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 Cloudscaling Group, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -25,6 +23,8 @@ import uuid
|
||||
import eventlet
|
||||
import greenlet
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
from six import moves
|
||||
|
||||
from keystone.openstack.common import excutils
|
||||
from keystone.openstack.common.gettextutils import _ # noqa
|
||||
@ -192,7 +192,7 @@ class ZmqSocket(object):
|
||||
# it would be much worse if some of the code calling this
|
||||
# were to fail. For now, lets log, and later evaluate
|
||||
# if we can safely raise here.
|
||||
LOG.error("ZeroMQ socket could not be closed.")
|
||||
LOG.error(_("ZeroMQ socket could not be closed."))
|
||||
self.sock = None
|
||||
|
||||
def recv(self, **kwargs):
|
||||
@ -221,7 +221,7 @@ class ZmqClient(object):
|
||||
return
|
||||
|
||||
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
|
||||
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
|
||||
zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
|
||||
self.outq.send(map(bytes,
|
||||
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
|
||||
|
||||
@ -383,6 +383,7 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
LOG.info(_("In reactor registered"))
|
||||
|
||||
def consume_in_thread(self):
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consume(sock):
|
||||
LOG.info(_("Consuming socket"))
|
||||
while True:
|
||||
@ -522,8 +523,8 @@ def unflatten_envelope(packenv):
|
||||
h = {}
|
||||
try:
|
||||
while True:
|
||||
k = i.next()
|
||||
h[k] = i.next()
|
||||
k = six.next(i)
|
||||
h[k] = six.next(i)
|
||||
except StopIteration:
|
||||
return h
|
||||
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 Cloudscaling Group, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Cloudscaling Group, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -95,7 +93,7 @@ class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
|
||||
if not redis:
|
||||
raise ImportError("Failed to import module redis.")
|
||||
|
||||
self.redis = redis.StrictRedis(
|
||||
self.redis = redis.Redis(
|
||||
host=CONF.matchmaker_redis.host,
|
||||
port=CONF.matchmaker_redis.port,
|
||||
password=CONF.matchmaker_redis.password)
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011-2013 Cloudscaling Group, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012-2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -21,7 +19,6 @@ For more information about rpc API version numbers, see:
|
||||
rpc/dispatcher.py
|
||||
"""
|
||||
|
||||
|
||||
from keystone.openstack.common import rpc
|
||||
from keystone.openstack.common.rpc import common as rpc_common
|
||||
from keystone.openstack.common.rpc import serializer as rpc_serializer
|
||||
@ -36,7 +33,7 @@ class RpcProxy(object):
|
||||
rpc API.
|
||||
"""
|
||||
|
||||
# The default namespace, which can be overriden in a subclass.
|
||||
# The default namespace, which can be overridden in a subclass.
|
||||
RPC_API_NAMESPACE = None
|
||||
|
||||
def __init__(self, topic, default_version, version_cap=None,
|
||||
|
@ -16,10 +16,12 @@
|
||||
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Serializer(object):
|
||||
"""Generic (de-)serialization definition base class."""
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def serialize_entity(self, context, entity):
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
|
3
keystone/openstack/common/rpc/zmq_receiver.py
Executable file → Normal file
3
keystone/openstack/common/rpc/zmq_receiver.py
Executable file → Normal file
@ -1,6 +1,3 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2011 Justin Santa Barbara
|
||||
@ -20,6 +18,7 @@
|
||||
"""Generic Node base class for all workers that run on hosts."""
|
||||
|
||||
import errno
|
||||
import logging as std_logging
|
||||
import os
|
||||
import random
|
||||
import signal
|
||||
@ -28,7 +27,6 @@ import time
|
||||
|
||||
import eventlet
|
||||
from eventlet import event
|
||||
import logging as std_logging
|
||||
from oslo.config import cfg
|
||||
|
||||
from keystone.openstack.common import eventlet_backdoor
|
||||
@ -43,6 +41,29 @@ CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _sighup_supported():
|
||||
return hasattr(signal, 'SIGHUP')
|
||||
|
||||
|
||||
def _is_sighup(signo):
|
||||
return _sighup_supported() and signo == signal.SIGHUP
|
||||
|
||||
|
||||
def _signo_to_signame(signo):
|
||||
signals = {signal.SIGTERM: 'SIGTERM',
|
||||
signal.SIGINT: 'SIGINT'}
|
||||
if _sighup_supported():
|
||||
signals[signal.SIGHUP] = 'SIGHUP'
|
||||
return signals[signo]
|
||||
|
||||
|
||||
def _set_signals_handler(handler):
|
||||
signal.signal(signal.SIGTERM, handler)
|
||||
signal.signal(signal.SIGINT, handler)
|
||||
if _sighup_supported():
|
||||
signal.signal(signal.SIGHUP, handler)
|
||||
|
||||
|
||||
class Launcher(object):
|
||||
"""Launch one or more services and wait for them to complete."""
|
||||
|
||||
@ -100,18 +121,13 @@ class SignalExit(SystemExit):
|
||||
class ServiceLauncher(Launcher):
|
||||
def _handle_signal(self, signo, frame):
|
||||
# Allow the process to be killed again and die from natural causes
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
|
||||
_set_signals_handler(signal.SIG_DFL)
|
||||
raise SignalExit(signo)
|
||||
|
||||
def handle_signal(self):
|
||||
signal.signal(signal.SIGTERM, self._handle_signal)
|
||||
signal.signal(signal.SIGINT, self._handle_signal)
|
||||
signal.signal(signal.SIGHUP, self._handle_signal)
|
||||
_set_signals_handler(self._handle_signal)
|
||||
|
||||
def _wait_for_exit_or_signal(self):
|
||||
def _wait_for_exit_or_signal(self, ready_callback=None):
|
||||
status = None
|
||||
signo = 0
|
||||
|
||||
@ -119,11 +135,11 @@ class ServiceLauncher(Launcher):
|
||||
CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
try:
|
||||
if ready_callback:
|
||||
ready_callback()
|
||||
super(ServiceLauncher, self).wait()
|
||||
except SignalExit as exc:
|
||||
signame = {signal.SIGTERM: 'SIGTERM',
|
||||
signal.SIGINT: 'SIGINT',
|
||||
signal.SIGHUP: 'SIGHUP'}[exc.signo]
|
||||
signame = _signo_to_signame(exc.signo)
|
||||
LOG.info(_('Caught %s, exiting'), signame)
|
||||
status = exc.code
|
||||
signo = exc.signo
|
||||
@ -140,11 +156,11 @@ class ServiceLauncher(Launcher):
|
||||
|
||||
return status, signo
|
||||
|
||||
def wait(self):
|
||||
def wait(self, ready_callback=None):
|
||||
while True:
|
||||
self.handle_signal()
|
||||
status, signo = self._wait_for_exit_or_signal()
|
||||
if signo != signal.SIGHUP:
|
||||
status, signo = self._wait_for_exit_or_signal(ready_callback)
|
||||
if not _is_sighup(signo):
|
||||
return status
|
||||
self.restart()
|
||||
|
||||
@ -167,18 +183,14 @@ class ProcessLauncher(object):
|
||||
self.handle_signal()
|
||||
|
||||
def handle_signal(self):
|
||||
signal.signal(signal.SIGTERM, self._handle_signal)
|
||||
signal.signal(signal.SIGINT, self._handle_signal)
|
||||
signal.signal(signal.SIGHUP, self._handle_signal)
|
||||
_set_signals_handler(self._handle_signal)
|
||||
|
||||
def _handle_signal(self, signo, frame):
|
||||
self.sigcaught = signo
|
||||
self.running = False
|
||||
|
||||
# Allow the process to be killed again and die from natural causes
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
_set_signals_handler(signal.SIG_DFL)
|
||||
|
||||
def _pipe_watcher(self):
|
||||
# This will block until the write end is closed when the parent
|
||||
@ -200,20 +212,22 @@ class ProcessLauncher(object):
|
||||
raise SignalExit(signal.SIGHUP)
|
||||
|
||||
signal.signal(signal.SIGTERM, _sigterm)
|
||||
signal.signal(signal.SIGHUP, _sighup)
|
||||
if _sighup_supported():
|
||||
signal.signal(signal.SIGHUP, _sighup)
|
||||
# Block SIGINT and let the parent send us a SIGTERM
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
def _child_wait_for_exit_or_signal(self, launcher):
|
||||
status = None
|
||||
status = 0
|
||||
signo = 0
|
||||
|
||||
# NOTE(johannes): All exceptions are caught to ensure this
|
||||
# doesn't fallback into the loop spawning children. It would
|
||||
# be bad for a child to spawn more children.
|
||||
try:
|
||||
launcher.wait()
|
||||
except SignalExit as exc:
|
||||
signame = {signal.SIGTERM: 'SIGTERM',
|
||||
signal.SIGINT: 'SIGINT',
|
||||
signal.SIGHUP: 'SIGHUP'}[exc.signo]
|
||||
signame = _signo_to_signame(exc.signo)
|
||||
LOG.info(_('Caught %s, exiting'), signame)
|
||||
status = exc.code
|
||||
signo = exc.signo
|
||||
@ -262,14 +276,11 @@ class ProcessLauncher(object):
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
# NOTE(johannes): All exceptions are caught to ensure this
|
||||
# doesn't fallback into the loop spawning children. It would
|
||||
# be bad for a child to spawn more children.
|
||||
launcher = self._child_process(wrap.service)
|
||||
while True:
|
||||
self._child_process_handle_signal()
|
||||
status, signo = self._child_wait_for_exit_or_signal(launcher)
|
||||
if signo != signal.SIGHUP:
|
||||
if not _is_sighup(signo):
|
||||
break
|
||||
launcher.restart()
|
||||
|
||||
@ -339,11 +350,9 @@ class ProcessLauncher(object):
|
||||
self.handle_signal()
|
||||
self._respawn_children()
|
||||
if self.sigcaught:
|
||||
signame = {signal.SIGTERM: 'SIGTERM',
|
||||
signal.SIGINT: 'SIGINT',
|
||||
signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
|
||||
signame = _signo_to_signame(self.sigcaught)
|
||||
LOG.info(_('Caught %s, stopping children'), signame)
|
||||
if self.sigcaught != signal.SIGHUP:
|
||||
if not _is_sighup(self.sigcaught):
|
||||
break
|
||||
|
||||
for pid in self.children:
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
52
keystone/openstack/common/test.py
Normal file
52
keystone/openstack/common/test.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Common utilities used in testing"""
|
||||
|
||||
import os
|
||||
|
||||
import fixtures
|
||||
import testtools
|
||||
|
||||
_TRUE_VALUES = ('True', 'true', '1', 'yes')
|
||||
|
||||
|
||||
class BaseTestCase(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(BaseTestCase, self).setUp()
|
||||
self._set_timeout()
|
||||
self._fake_output()
|
||||
self.useFixture(fixtures.FakeLogger('keystone.openstack.common'))
|
||||
self.useFixture(fixtures.NestedTempfile())
|
||||
self.useFixture(fixtures.TempHomeDir())
|
||||
|
||||
def _set_timeout(self):
|
||||
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
|
||||
try:
|
||||
test_timeout = int(test_timeout)
|
||||
except ValueError:
|
||||
# If timeout value is invalid do not set a timeout.
|
||||
test_timeout = 0
|
||||
if test_timeout > 0:
|
||||
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
|
||||
|
||||
def _fake_output(self):
|
||||
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
|
||||
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
|
||||
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
|
||||
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
|
||||
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
|
||||
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -48,6 +46,9 @@ class Thread(object):
|
||||
def wait(self):
|
||||
return self.thread.wait()
|
||||
|
||||
def link(self, func, *args, **kwargs):
|
||||
self.thread.link(func, *args, **kwargs)
|
||||
|
||||
|
||||
class ThreadGroup(object):
|
||||
"""The point of the ThreadGroup classis to:
|
||||
@ -79,6 +80,7 @@ class ThreadGroup(object):
|
||||
gt = self.pool.spawn(callback, *args, **kwargs)
|
||||
th = Thread(gt, self)
|
||||
self.threads.append(th)
|
||||
return th
|
||||
|
||||
def thread_done(self, thread):
|
||||
self.threads.remove(thread)
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
@ -21,6 +19,7 @@ Time related utilities and helper functions.
|
||||
|
||||
import calendar
|
||||
import datetime
|
||||
import time
|
||||
|
||||
import iso8601
|
||||
import six
|
||||
@ -49,9 +48,9 @@ def parse_isotime(timestr):
|
||||
try:
|
||||
return iso8601.parse_date(timestr)
|
||||
except iso8601.ParseError as e:
|
||||
raise ValueError(unicode(e))
|
||||
raise ValueError(six.text_type(e))
|
||||
except TypeError as e:
|
||||
raise ValueError(unicode(e))
|
||||
raise ValueError(six.text_type(e))
|
||||
|
||||
|
||||
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
|
||||
@ -90,6 +89,11 @@ def is_newer_than(after, seconds):
|
||||
|
||||
def utcnow_ts():
|
||||
"""Timestamp version of our utcnow function."""
|
||||
if utcnow.override_time is None:
|
||||
# NOTE(kgriffs): This is several times faster
|
||||
# than going through calendar.timegm(...)
|
||||
return int(time.time())
|
||||
|
||||
return calendar.timegm(utcnow().timetuple())
|
||||
|
||||
|
||||
@ -111,12 +115,15 @@ def iso8601_from_timestamp(timestamp):
|
||||
utcnow.override_time = None
|
||||
|
||||
|
||||
def set_time_override(override_time=datetime.datetime.utcnow()):
|
||||
def set_time_override(override_time=None):
|
||||
"""Overrides utils.utcnow.
|
||||
|
||||
Make it return a constant time or a list thereof, one at a time.
|
||||
|
||||
:param override_time: datetime instance or list thereof. If not
|
||||
given, defaults to the current UTC time.
|
||||
"""
|
||||
utcnow.override_time = override_time
|
||||
utcnow.override_time = override_time or datetime.datetime.utcnow()
|
||||
|
||||
|
||||
def advance_time_delta(timedelta):
|
||||
@ -169,6 +176,15 @@ def delta_seconds(before, after):
|
||||
datetime objects (as a float, to microsecond resolution).
|
||||
"""
|
||||
delta = after - before
|
||||
return total_seconds(delta)
|
||||
|
||||
|
||||
def total_seconds(delta):
|
||||
"""Return the total seconds of datetime.timedelta object.
|
||||
|
||||
Compute total seconds of datetime.timedelta, datetime.timedelta
|
||||
doesn't have method total_seconds in Python2.6, calculate it manually.
|
||||
"""
|
||||
try:
|
||||
return delta.total_seconds()
|
||||
except AttributeError:
|
||||
|
43
keystone/openstack/common/versionutils.py
Normal file
43
keystone/openstack/common/versionutils.py
Normal file
@ -0,0 +1,43 @@
|
||||
# Copyright (c) 2013 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Helpers for comparing version strings.
|
||||
"""
|
||||
|
||||
import pkg_resources
|
||||
|
||||
|
||||
def is_compatible(requested_version, current_version, same_major=True):
|
||||
"""Determine whether `requested_version` is satisfied by
|
||||
`current_version`; in other words, `current_version` is >=
|
||||
`requested_version`.
|
||||
|
||||
:param requested_version: version to check for compatibility
|
||||
:param current_version: version to check against
|
||||
:param same_major: if True, the major version must be identical between
|
||||
`requested_version` and `current_version`. This is used when a
|
||||
major-version difference indicates incompatibility between the two
|
||||
versions. Since this is the common-case in practice, the default is
|
||||
True.
|
||||
:returns: True if compatible, False if not
|
||||
"""
|
||||
requested_parts = pkg_resources.parse_version(requested_version)
|
||||
current_parts = pkg_resources.parse_version(current_version)
|
||||
|
||||
if same_major and (requested_parts[0] != current_parts[0]):
|
||||
return False
|
||||
|
||||
return current_parts >= requested_parts
|
@ -1,5 +1,4 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013, Nebula, Inc.
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
@ -47,6 +46,7 @@ import subunit
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import six
|
||||
import testtools
|
||||
|
||||
|
||||
@ -274,7 +274,7 @@ class OpenStackTestResult(testtools.TestResult):
|
||||
self.stopTestRun()
|
||||
|
||||
def stopTestRun(self):
|
||||
for cls in list(self.results.iterkeys()):
|
||||
for cls in list(six.iterkeys(self.results)):
|
||||
self.writeTestCase(cls)
|
||||
self.stream.writeln()
|
||||
self.writeSlowTests()
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
# Copyright 2013 IBM Corp.
|
||||
#
|
||||
@ -114,12 +112,12 @@ class InstallVenv(object):
|
||||
print('Installing dependencies with pip (this can take a while)...')
|
||||
|
||||
# First things first, make sure our venv has the latest pip and
|
||||
# setuptools.
|
||||
self.pip_install('pip>=1.3')
|
||||
# setuptools and pbr
|
||||
self.pip_install('pip>=1.4')
|
||||
self.pip_install('setuptools')
|
||||
self.pip_install('pbr')
|
||||
|
||||
self.pip_install('-r', self.requirements)
|
||||
self.pip_install('-r', self.test_requirements)
|
||||
self.pip_install('-r', self.requirements, '-r', self.test_requirements)
|
||||
|
||||
def parse_args(self, argv):
|
||||
"""Parses command-line arguments."""
|
||||
|
Loading…
x
Reference in New Issue
Block a user