Migrated to Murano Common

Replaced old RabbitMQ code over amqplib to Murano Common MqClient

Change-Id: Icf721f9304f65df02231bde77c8472fd6e20c90d
This commit is contained in:
Serg Melikyan 2013-07-23 14:29:18 +04:00
parent 7c86a1f411
commit 072b593de6
55 changed files with 1342 additions and 1091 deletions

View File

@ -1,4 +1,4 @@
[pipeline:murano-api] [pipeline:api.py]
pipeline = authtoken context apiv1app pipeline = authtoken context apiv1app
[app:apiv1app] [app:apiv1app]

View File

@ -14,3 +14,7 @@
import gettext import gettext
gettext.install('muranoapi', './muranoapi/locale', unicode=1) gettext.install('muranoapi', './muranoapi/locale', unicode=1)
from pbr import version
__version_info = version.VersionInfo('muranoapi')
__version__ = __version_info.cached_version_string()

View File

@ -12,11 +12,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import eventlet
from muranoapi.common.utils import build_entity_map
from sqlalchemy import desc from sqlalchemy import desc
from webob import exc from webob import exc
from muranoapi.common import config from muranoapi.common import config
from muranoapi.common.utils import build_entity_map
from muranoapi.db.session import get_session from muranoapi.db.session import get_session
from muranoapi.db.models import Environment, Status from muranoapi.db.models import Environment, Status
from muranoapi.db.services.core_services import CoreServices from muranoapi.db.services.core_services import CoreServices
@ -24,7 +23,6 @@ from muranoapi.db.services.environments import EnvironmentServices
from muranoapi.openstack.common import wsgi from muranoapi.openstack.common import wsgi
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
rabbitmq = config.CONF.rabbitmq rabbitmq = config.CONF.rabbitmq
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View File

@ -11,7 +11,3 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from muranoapi.openstack.common import version as common_version
version_info = common_version.VersionInfo('muranoapi')

View File

@ -14,17 +14,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import sys import sys
# If ../muranoapi/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'muranoapi', '__init__.py')):
sys.path.insert(0, possible_topdir)
from muranoapi.common import config from muranoapi.common import config
from muranoapi.common.service import TaskResultHandlerService from muranoapi.common.service import TaskResultHandlerService
from muranoapi.openstack.common import log from muranoapi.openstack.common import log
@ -32,7 +23,7 @@ from muranoapi.openstack.common import wsgi
from muranoapi.openstack.common import service from muranoapi.openstack.common import service
if __name__ == '__main__': def main():
try: try:
config.parse_args() config.parse_args()
log.setup('muranoapi') log.setup('muranoapi')
@ -49,3 +40,7 @@ if __name__ == '__main__':
except RuntimeError, e: except RuntimeError, e:
sys.stderr.write("ERROR: %s\n" % e) sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1) sys.exit(1)
if __name__ == '__main__':
main()

View File

@ -30,7 +30,7 @@ from oslo.config import cfg
from paste import deploy from paste import deploy
from muranoapi.openstack.common import log from muranoapi.openstack.common import log
from muranoapi.version import version_info as version from muranoapi import __version__ as version
paste_deploy_opts = [ paste_deploy_opts = [
cfg.StrOpt('flavor'), cfg.StrOpt('flavor'),
@ -84,8 +84,7 @@ CONF.import_opt('syslog_log_facility', 'muranoapi.openstack.common.log')
cfg.set_defaults(log.log_opts, cfg.set_defaults(log.log_opts,
default_log_levels=['amqplib=WARN', default_log_levels=['qpid.messaging=INFO',
'qpid.messaging=INFO',
'sqlalchemy=WARN', 'sqlalchemy=WARN',
'keystoneclient=INFO', 'keystoneclient=INFO',
'eventlet.wsgi.server=WARN']) 'eventlet.wsgi.server=WARN'])
@ -94,7 +93,7 @@ cfg.set_defaults(log.log_opts,
def parse_args(args=None, usage=None, default_config_files=None): def parse_args(args=None, usage=None, default_config_files=None):
CONF(args=args, CONF(args=args,
project='muranoapi', project='muranoapi',
version=version.cached_version_string(), version=version,
usage=usage, usage=usage,
default_config_files=default_config_files) default_config_files=default_config_files)

View File

@ -12,75 +12,63 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import socket from muranoapi.common.utils import handle
from amqplib.client_0_8 import AMQPConnectionException
import anyjson
import eventlet
from muranoapi.common.utils import retry, handle
from muranoapi.db.models import Status, Session, Environment, Deployment from muranoapi.db.models import Status, Session, Environment, Deployment
from muranoapi.db.session import get_session from muranoapi.db.session import get_session
from muranoapi.openstack.common import log as logging, timeutils from muranoapi.openstack.common import log as logging, timeutils, service
from muranoapi.common import config from muranoapi.common import config
from muranocommon.mq import MqClient
from sqlalchemy import desc from sqlalchemy import desc
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
conf = config.CONF.reports conf = config.CONF.reports
rabbitmq = config.CONF.rabbitmq rabbitmq = config.CONF.rabbitmq
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class TaskResultHandlerService(): class TaskResultHandlerService(service.Service):
thread = None connection_params = {
'login': rabbitmq.login,
'password': rabbitmq.password,
'host': rabbitmq.host,
'port': rabbitmq.port,
'virtual_host': rabbitmq.virtual_host
}
def __init__(self):
super(TaskResultHandlerService, self).__init__()
def start(self): def start(self):
self.thread = eventlet.spawn(self.connect) super(TaskResultHandlerService, self).start()
self.tg.add_thread(self._start_rabbitmq)
def stop(self): def stop(self):
pass super(TaskResultHandlerService, self).stop()
def wait(self): def _start_rabbitmq(self):
self.thread.wait() while True:
try:
@retry((socket.error, AMQPConnectionException), tries=-1) with MqClient(**self.connection_params) as mqClient:
def connect(self): mqClient.declare(conf.results_exchange, conf.results_queue)
connection = amqp.Connection('{0}:{1}'. mqClient.declare(conf.reports_exchange, conf.reports_queue)
format(rabbitmq.host, rabbitmq.port), with mqClient.open(conf.results_queue) as results_sb:
virtual_host=rabbitmq.virtual_host, with mqClient.open(conf.reports_queue) as reports_sb:
userid=rabbitmq.login, while True:
password=rabbitmq.password, report = reports_sb.get_message(timeout=1000)
ssl=rabbitmq.use_ssl, insist=True) self.tg.add_thread(handle_report, report.body)
ch = connection.channel() result = results_sb.get_message(timeout=1000)
self.tg.add_thread(handle_result, result.body)
def bind(exchange, queue): except Exception as ex:
if not exchange: log.exception(ex)
ch.exchange_declare(exchange, 'direct', durable=True,
auto_delete=False)
ch.queue_declare(queue, durable=True, auto_delete=False)
if not exchange:
ch.queue_bind(queue, exchange, queue)
bind(conf.results_exchange, conf.results_queue)
bind(conf.reports_exchange, conf.reports_queue)
ch.basic_consume(conf.results_exchange, callback=handle_result)
ch.basic_consume(conf.reports_exchange, callback=handle_report,
no_ack=True)
while ch.callbacks:
ch.wait()
@handle @handle
def handle_result(msg): def handle_result(environment_result):
log.debug(_('Got result message from ' log.debug(_('Got result message from '
'orchestration engine:\n{0}'.format(msg.body))) 'orchestration engine:\n{0}'.format(environment_result)))
environment_result = anyjson.deserialize(msg.body)
if 'deleted' in environment_result: if 'deleted' in environment_result:
log.debug(_('Result for environment {0} is dropped. Environment ' log.debug(_('Result for environment {0} is dropped. Environment '
'is deleted'.format(environment_result['id']))) 'is deleted'.format(environment_result['id'])))
msg.channel.basic_ack(msg.delivery_tag)
return return
session = get_session() session = get_session()
@ -108,20 +96,18 @@ def handle_result(msg):
status.text = "Deployment finished" status.text = "Deployment finished"
deployment.statuses.append(status) deployment.statuses.append(status)
deployment.save(session) deployment.save(session)
msg.channel.basic_ack(msg.delivery_tag)
@handle @handle
def handle_report(msg): def handle_report(report):
log.debug(_('Got report message from orchestration ' log.debug(_('Got report message from orchestration '
'engine:\n{0}'.format(msg.body))) 'engine:\n{0}'.format(report)))
params = anyjson.deserialize(msg.body) report['entity_id'] = report['id']
params['entity_id'] = params['id'] del report['id']
del params['id']
status = Status() status = Status()
status.update(params) status.update(report)
session = get_session() session = get_session()
#connect with deployment #connect with deployment

View File

@ -13,18 +13,15 @@
# under the License. # under the License.
from collections import namedtuple from collections import namedtuple
from amqplib.client_0_8 import Message
import anyjson
import eventlet
from jsonschema import validate from jsonschema import validate
from muranoapi.api.v1.schemas import ENV_SCHEMA from muranoapi.api.v1.schemas import ENV_SCHEMA
from muranoapi.common import config from muranoapi.common import config
from muranoapi.db.models import Session, Environment from muranoapi.db.models import Session, Environment
from muranoapi.db.services.sessions import SessionServices, SessionState
from muranoapi.db.session import get_session from muranoapi.db.session import get_session
from sessions import SessionServices, SessionState from muranocommon.mq import MqClient, Message
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
rabbitmq = config.CONF.rabbitmq rabbitmq = config.CONF.rabbitmq
EnvironmentStatus = namedtuple('EnvironmentStatus', [ EnvironmentStatus = namedtuple('EnvironmentStatus', [
@ -122,18 +119,20 @@ class EnvironmentServices(object):
#Set X-Auth-Token for conductor #Set X-Auth-Token for conductor
env['token'] = token env['token'] = token
connection = amqp.Connection('{0}:{1}'. message = Message()
format(rabbitmq.host, rabbitmq.port), message.body = env
virtual_host=rabbitmq.virtual_host,
userid=rabbitmq.login,
password=rabbitmq.password,
ssl=rabbitmq.use_ssl, insist=True)
channel = connection.channel()
channel.exchange_declare('tasks', 'direct', durable=True,
auto_delete=False)
channel.basic_publish(Message(body=anyjson.serialize(env)), 'tasks', connection_params = {
'tasks') 'login': rabbitmq.login,
'password': rabbitmq.password,
'host': rabbitmq.host,
'port': rabbitmq.port,
'virtual_host': rabbitmq.virtual_host
}
with MqClient(**connection_params) as mqClient:
mqClient.declare('tasks', 'tasks')
mqClient.send(message, 'tasks', 'tasks')
@staticmethod @staticmethod
def get_environment_description(environment_id, session_id=None): def get_environment_description(environment_id, session_id=None):

View File

@ -13,15 +13,12 @@
# under the License. # under the License.
from collections import namedtuple from collections import namedtuple
from amqplib.client_0_8 import Message
import anyjson
import eventlet
from muranoapi.common import config from muranoapi.common import config
from muranoapi.db.models import Session, Environment, Deployment, Status from muranoapi.db.models import Session, Environment, Deployment, Status
from muranoapi.db.session import get_session from muranoapi.db.session import get_session
from muranocommon.mq import MqClient, Message
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
rabbitmq = config.CONF.rabbitmq rabbitmq = config.CONF.rabbitmq
SessionState = namedtuple('SessionState', ['open', 'deploying', 'deployed'])( SessionState = namedtuple('SessionState', ['open', 'deploying', 'deployed'])(
@ -136,16 +133,17 @@ class SessionServices(object):
unit.add(session) unit.add(session)
unit.add(deployment) unit.add(deployment)
connection = amqp.Connection('{0}:{1}'. message = Message()
format(rabbitmq.host, rabbitmq.port), message.body = environment
virtual_host=rabbitmq.virtual_host,
userid=rabbitmq.login,
password=rabbitmq.password,
ssl=rabbitmq.use_ssl, insist=True)
channel = connection.channel()
channel.exchange_declare('tasks', 'direct', durable=True,
auto_delete=False)
channel.basic_publish( connection_params = {
Message(body=anyjson.serialize(environment)), 'tasks', 'tasks' 'login': rabbitmq.login,
) 'password': rabbitmq.password,
'host': rabbitmq.host,
'port': rabbitmq.port,
'virtual_host': rabbitmq.virtual_host
}
with MqClient(**connection_params) as mqClient:
mqClient.declare('tasks', 'tasks')
mqClient.send(message, 'tasks', 'tasks')

53
muranoapi/openstack/common/config/generator.py Executable file → Normal file
View File

@ -1,4 +1,3 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 SINA Corporation # Copyright 2012 SINA Corporation
@ -16,10 +15,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# #
# @author: Zhongyue Luo, SINA Corporation.
#
"""Extracts OpenStack config option info from module(s).""" """Extracts OpenStack config option info from module(s)."""
from __future__ import print_function
import imp import imp
import os import os
import re import re
@ -50,7 +50,6 @@ OPT_TYPES = {
MULTISTROPT: 'multi valued', MULTISTROPT: 'multi valued',
} }
OPTION_COUNT = 0
OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT, OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT,
FLOATOPT, LISTOPT, FLOATOPT, LISTOPT,
MULTISTROPT])) MULTISTROPT]))
@ -97,8 +96,6 @@ def generate(srcfiles):
for group, opts in opts_by_group.items(): for group, opts in opts_by_group.items():
print_group_opts(group, opts) print_group_opts(group, opts)
print "# Total option count: %d" % OPTION_COUNT
def _import_module(mod_str): def _import_module(mod_str):
try: try:
@ -161,18 +158,16 @@ def _list_opts(obj):
def print_group_opts(group, opts_by_module): def print_group_opts(group, opts_by_module):
print "[%s]" % group print("[%s]" % group)
print print('')
global OPTION_COUNT
for mod, opts in opts_by_module: for mod, opts in opts_by_module:
OPTION_COUNT += len(opts) print('#')
print '#' print('# Options defined in %s' % mod)
print '# Options defined in %s' % mod print('#')
print '#' print('')
print
for opt in opts: for opt in opts:
_print_opt(opt) _print_opt(opt)
print print('')
def _get_my_ip(): def _get_my_ip():
@ -188,7 +183,12 @@ def _get_my_ip():
def _sanitize_default(s): def _sanitize_default(s):
"""Set up a reasonably sensible default for pybasedir, my_ip and host.""" """Set up a reasonably sensible default for pybasedir, my_ip and host."""
if s.startswith(BASEDIR): if s.startswith(sys.prefix):
# NOTE(jd) Don't use os.path.join, because it is likely to think the
# second part is an absolute pathname and therefore drop the first
# part.
s = os.path.normpath("/usr/" + s[len(sys.prefix):])
elif s.startswith(BASEDIR):
return s.replace(BASEDIR, '/usr/lib/python/site-packages') return s.replace(BASEDIR, '/usr/lib/python/site-packages')
elif BASEDIR in s: elif BASEDIR in s:
return s.replace(BASEDIR, '') return s.replace(BASEDIR, '')
@ -205,6 +205,7 @@ def _print_opt(opt):
opt_name, opt_default, opt_help = opt.dest, opt.default, opt.help opt_name, opt_default, opt_help = opt.dest, opt.default, opt.help
if not opt_help: if not opt_help:
sys.stderr.write('WARNING: "%s" is missing help string.\n' % opt_name) sys.stderr.write('WARNING: "%s" is missing help string.\n' % opt_name)
opt_help = ""
opt_type = None opt_type = None
try: try:
opt_type = OPTION_REGEX.search(str(type(opt))).group(0) opt_type = OPTION_REGEX.search(str(type(opt))).group(0)
@ -212,33 +213,33 @@ def _print_opt(opt):
sys.stderr.write("%s\n" % str(err)) sys.stderr.write("%s\n" % str(err))
sys.exit(1) sys.exit(1)
opt_help += ' (' + OPT_TYPES[opt_type] + ')' opt_help += ' (' + OPT_TYPES[opt_type] + ')'
print '#', "\n# ".join(textwrap.wrap(opt_help, WORDWRAP_WIDTH)) print('#', "\n# ".join(textwrap.wrap(opt_help, WORDWRAP_WIDTH)))
try: try:
if opt_default is None: if opt_default is None:
print '#%s=<None>' % opt_name print('#%s=<None>' % opt_name)
elif opt_type == STROPT: elif opt_type == STROPT:
assert(isinstance(opt_default, basestring)) assert(isinstance(opt_default, basestring))
print '#%s=%s' % (opt_name, _sanitize_default(opt_default)) print('#%s=%s' % (opt_name, _sanitize_default(opt_default)))
elif opt_type == BOOLOPT: elif opt_type == BOOLOPT:
assert(isinstance(opt_default, bool)) assert(isinstance(opt_default, bool))
print '#%s=%s' % (opt_name, str(opt_default).lower()) print('#%s=%s' % (opt_name, str(opt_default).lower()))
elif opt_type == INTOPT: elif opt_type == INTOPT:
assert(isinstance(opt_default, int) and assert(isinstance(opt_default, int) and
not isinstance(opt_default, bool)) not isinstance(opt_default, bool))
print '#%s=%s' % (opt_name, opt_default) print('#%s=%s' % (opt_name, opt_default))
elif opt_type == FLOATOPT: elif opt_type == FLOATOPT:
assert(isinstance(opt_default, float)) assert(isinstance(opt_default, float))
print '#%s=%s' % (opt_name, opt_default) print('#%s=%s' % (opt_name, opt_default))
elif opt_type == LISTOPT: elif opt_type == LISTOPT:
assert(isinstance(opt_default, list)) assert(isinstance(opt_default, list))
print '#%s=%s' % (opt_name, ','.join(opt_default)) print('#%s=%s' % (opt_name, ','.join(opt_default)))
elif opt_type == MULTISTROPT: elif opt_type == MULTISTROPT:
assert(isinstance(opt_default, list)) assert(isinstance(opt_default, list))
if not opt_default: if not opt_default:
opt_default = [''] opt_default = ['']
for default in opt_default: for default in opt_default:
print '#%s=%s' % (opt_name, default) print('#%s=%s' % (opt_name, default))
print print('')
except Exception: except Exception:
sys.stderr.write('Error in option "%s"\n' % opt_name) sys.stderr.write('Error in option "%s"\n' % opt_name)
sys.exit(1) sys.exit(1)
@ -246,7 +247,7 @@ def _print_opt(opt):
def main(): def main():
if len(sys.argv) < 2: if len(sys.argv) < 2:
print "usage: %s [srcfile]...\n" % sys.argv[0] print("usage: %s [srcfile]...\n" % sys.argv[0])
sys.exit(0) sys.exit(0)
generate(sys.argv[1:]) generate(sys.argv[1:])

View File

@ -61,7 +61,7 @@ class RequestContext(object):
'request_id': self.request_id} 'request_id': self.request_id}
def get_admin_context(show_deleted="no"): def get_admin_context(show_deleted=False):
context = RequestContext(None, context = RequestContext(None,
tenant=None, tenant=None,
is_admin=True, is_admin=True,

View File

@ -18,7 +18,7 @@
"""DB related custom exceptions.""" """DB related custom exceptions."""
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
class DBError(Exception): class DBError(Exception):

View File

@ -0,0 +1,159 @@
# coding: utf-8
#
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Base on code in migrate/changeset/databases/sqlite.py which is under
# the following license:
#
# The MIT License
#
# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
import re
from migrate.changeset import ansisql
from migrate.changeset.databases import sqlite
from sqlalchemy.schema import UniqueConstraint
def _get_unique_constraints(self, table):
"""Retrieve information about existing unique constraints of the table
This feature is needed for _recreate_table() to work properly.
Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x.
"""
data = table.metadata.bind.execute(
"""SELECT sql
FROM sqlite_master
WHERE
type='table' AND
name=:table_name""",
table_name=table.name
).fetchone()[0]
UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)"
return [
UniqueConstraint(
*[getattr(table.columns, c.strip(' "')) for c in cols.split(",")],
name=name
)
for name, cols in re.findall(UNIQUE_PATTERN, data)
]
def _recreate_table(self, table, column=None, delta=None, omit_uniques=None):
"""Recreate the table properly
Unlike the corresponding original method of sqlalchemy-migrate this one
doesn't drop existing unique constraints when creating a new one.
"""
table_name = self.preparer.format_table(table)
# we remove all indexes so as not to have
# problems during copy and re-create
for index in table.indexes:
index.drop()
# reflect existing unique constraints
for uc in self._get_unique_constraints(table):
table.append_constraint(uc)
# omit given unique constraints when creating a new table if required
table.constraints = set([
cons for cons in table.constraints
if omit_uniques is None or cons.name not in omit_uniques
])
self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
self.execute()
insertion_string = self._modify_table(table, column, delta)
table.create(bind=self.connection)
self.append(insertion_string % {'table_name': table_name})
self.execute()
self.append('DROP TABLE migration_tmp')
self.execute()
def _visit_migrate_unique_constraint(self, *p, **k):
"""Drop the given unique constraint
The corresponding original method of sqlalchemy-migrate just
raises NotImplemented error
"""
self.recreate_table(p[0].table, omit_uniques=[p[0].name])
def patch_migrate():
"""A workaround for SQLite's inability to alter things
SQLite abilities to alter tables are very limited (please read
http://www.sqlite.org/lang_altertable.html for more details).
E. g. one can't drop a column or a constraint in SQLite. The
workaround for this is to recreate the original table omitting
the corresponding constraint (or column).
sqlalchemy-migrate library has recreate_table() method that
implements this workaround, but it does it wrong:
- information about unique constraints of a table
is not retrieved. So if you have a table with one
unique constraint and a migration adding another one
you will end up with a table that has only the
latter unique constraint, and the former will be lost
- dropping of unique constraints is not supported at all
The proper way to fix this is to provide a pull-request to
sqlalchemy-migrate, but the project seems to be dead. So we
can go on with monkey-patching of the lib at least for now.
"""
# this patch is needed to ensure that recreate_table() doesn't drop
# existing unique constraints of the table when creating a new one
helper_cls = sqlite.SQLiteHelper
helper_cls.recreate_table = _recreate_table
helper_cls._get_unique_constraints = _get_unique_constraints
# this patch is needed to be able to drop existing unique constraints
constraint_cls = sqlite.SQLiteConstraintDropper
constraint_cls.visit_migrate_unique_constraint = \
_visit_migrate_unique_constraint
constraint_cls.__bases__ = (ansisql.ANSIColumnDropper,
sqlite.SQLiteConstraintGenerator)

View File

@ -22,11 +22,13 @@
SQLAlchemy models. SQLAlchemy models.
""" """
import six
from sqlalchemy import Column, Integer from sqlalchemy import Column, Integer
from sqlalchemy import DateTime from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper from sqlalchemy.orm import object_mapper
from muranoapi.openstack.common.db.sqlalchemy.session import get_session from muranoapi.openstack.common.db.sqlalchemy import session as sa
from muranoapi.openstack.common import timeutils from muranoapi.openstack.common import timeutils
@ -37,7 +39,7 @@ class ModelBase(object):
def save(self, session=None): def save(self, session=None):
"""Save this object.""" """Save this object."""
if not session: if not session:
session = get_session() session = sa.get_session()
# NOTE(boris-42): This part of code should be look like: # NOTE(boris-42): This part of code should be look like:
# sesssion.add(self) # sesssion.add(self)
# session.flush() # session.flush()
@ -70,12 +72,12 @@ class ModelBase(object):
return self return self
def next(self): def next(self):
n = self._i.next() n = six.advance_iterator(self._i)
return n, getattr(self, n) return n, getattr(self, n)
def update(self, values): def update(self, values):
"""Make the model object behave like a dict.""" """Make the model object behave like a dict."""
for k, v in values.iteritems(): for k, v in six.iteritems(values):
setattr(self, k, v) setattr(self, k, v)
def iteritems(self): def iteritems(self):
@ -84,7 +86,7 @@ class ModelBase(object):
Includes attributes from joins. Includes attributes from joins.
""" """
local = dict(self) local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems() joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
if not k[0] == '_']) if not k[0] == '_'])
local.update(joined) local.update(joined)
return local.iteritems() return local.iteritems()

View File

@ -256,12 +256,10 @@ from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql.expression import literal_column
from muranoapi.openstack.common.db import exception from muranoapi.openstack.common.db import exception
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common import timeutils from muranoapi.openstack.common import timeutils
DEFAULT = 'DEFAULT'
sqlite_db_opts = [ sqlite_db_opts = [
cfg.StrOpt('sqlite_db', cfg.StrOpt('sqlite_db',
default='muranoapi.sqlite', default='muranoapi.sqlite',
@ -278,8 +276,6 @@ database_opts = [
'../', '$sqlite_db')), '../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the ' help='The SQLAlchemy connection string used to connect to the '
'database', 'database',
deprecated_name='sql_connection',
deprecated_group=DEFAULT,
secret=True), secret=True),
cfg.StrOpt('slave_connection', cfg.StrOpt('slave_connection',
default='', default='',
@ -288,47 +284,31 @@ database_opts = [
secret=True), secret=True),
cfg.IntOpt('idle_timeout', cfg.IntOpt('idle_timeout',
default=3600, default=3600,
deprecated_name='sql_idle_timeout',
deprecated_group=DEFAULT,
help='timeout before idle sql connections are reaped'), help='timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size', cfg.IntOpt('min_pool_size',
default=1, default=1,
deprecated_name='sql_min_pool_size',
deprecated_group=DEFAULT,
help='Minimum number of SQL connections to keep open in a ' help='Minimum number of SQL connections to keep open in a '
'pool'), 'pool'),
cfg.IntOpt('max_pool_size', cfg.IntOpt('max_pool_size',
default=None, default=None,
deprecated_name='sql_max_pool_size',
deprecated_group=DEFAULT,
help='Maximum number of SQL connections to keep open in a ' help='Maximum number of SQL connections to keep open in a '
'pool'), 'pool'),
cfg.IntOpt('max_retries', cfg.IntOpt('max_retries',
default=10, default=10,
deprecated_name='sql_max_retries',
deprecated_group=DEFAULT,
help='maximum db connection retries during startup. ' help='maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'), '(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval', cfg.IntOpt('retry_interval',
default=10, default=10,
deprecated_name='sql_retry_interval',
deprecated_group=DEFAULT,
help='interval between retries of opening a sql connection'), help='interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow', cfg.IntOpt('max_overflow',
default=None, default=None,
deprecated_name='sql_max_overflow',
deprecated_group=DEFAULT,
help='If set, use this value for max_overflow with sqlalchemy'), help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('connection_debug', cfg.IntOpt('connection_debug',
default=0, default=0,
deprecated_name='sql_connection_debug',
deprecated_group=DEFAULT,
help='Verbosity of SQL debugging information. 0=None, ' help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'), '100=Everything'),
cfg.BoolOpt('connection_trace', cfg.BoolOpt('connection_trace',
default=False, default=False,
deprecated_name='sql_connection_trace',
deprecated_group=DEFAULT,
help='Add python stack traces to SQL as comment strings'), help='Add python stack traces to SQL as comment strings'),
cfg.IntOpt('pool_timeout', cfg.IntOpt('pool_timeout',
default=None, default=None,
@ -338,6 +318,7 @@ database_opts = [
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(sqlite_db_opts) CONF.register_opts(sqlite_db_opts)
CONF.register_opts(database_opts, 'database') CONF.register_opts(database_opts, 'database')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_ENGINE = None _ENGINE = None

342
muranoapi/openstack/common/db/sqlalchemy/utils.py Normal file → Executable file
View File

@ -18,12 +18,28 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""Implementation of paginate query."""
import sqlalchemy import sqlalchemy
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
from sqlalchemy.engine import reflection
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import NullType
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import exception
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common import timeutils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -85,11 +101,14 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
# Add sorting # Add sorting
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):
sort_dir_func = { try:
'asc': sqlalchemy.asc, sort_dir_func = {
'desc': sqlalchemy.desc, 'asc': sqlalchemy.asc,
}[current_sort_dir] 'desc': sqlalchemy.desc,
}[current_sort_dir]
except KeyError:
raise ValueError(_("Unknown sort direction, "
"must be 'desc' or 'asc'"))
try: try:
sort_key_attr = getattr(model, current_sort_key) sort_key_attr = getattr(model, current_sort_key)
except AttributeError: except AttributeError:
@ -114,11 +133,8 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
model_attr = getattr(model, sort_keys[i]) model_attr = getattr(model, sort_keys[i])
if sort_dirs[i] == 'desc': if sort_dirs[i] == 'desc':
crit_attrs.append((model_attr < marker_values[i])) crit_attrs.append((model_attr < marker_values[i]))
elif sort_dirs[i] == 'asc':
crit_attrs.append((model_attr > marker_values[i]))
else: else:
raise ValueError(_("Unknown sort direction, " crit_attrs.append((model_attr > marker_values[i]))
"must be 'desc' or 'asc'"))
criteria = sqlalchemy.sql.and_(*crit_attrs) criteria = sqlalchemy.sql.and_(*crit_attrs)
criteria_list.append(criteria) criteria_list.append(criteria)
@ -130,3 +146,305 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
query = query.limit(limit) query = query.limit(limit)
return query return query
def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db.
Needed because the models don't work for us in migrations
as models will be far out of sync with the current data.
"""
metadata = MetaData()
metadata.bind = engine
return Table(name, metadata, autoload=True)
class InsertFromSelect(UpdateBase):
"""Form the base for `INSERT INTO table (SELECT ... )` statement."""
def __init__(self, table, select):
self.table = table
self.select = select
@compiles(InsertFromSelect)
def visit_insert_from_select(element, compiler, **kw):
"""Form the `INSERT INTO table (SELECT ... )` statement."""
return "INSERT INTO %s %s" % (
compiler.process(element.table, asfrom=True),
compiler.process(element.select))
def _get_not_supported_column(col_name_col_instance, column_name):
try:
column = col_name_col_instance[column_name]
except KeyError:
msg = _("Please specify column %s in col_name_col_instance "
"param. It is required because column has unsupported "
"type by sqlite).")
raise exception.OpenstackException(message=msg % column_name)
if not isinstance(column, Column):
msg = _("col_name_col_instance param has wrong type of "
"column instance for column %s It should be instance "
"of sqlalchemy.Column.")
raise exception.OpenstackException(message=msg % column_name)
return column
def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
use_soft_delete, *uc_column_names):
"""Drop all old rows having the same values for columns in uc_columns.
This method drop (or mark ad `deleted` if use_soft_delete is True) old
duplicate rows form table with name `table_name`.
:param migrate_engine: Sqlalchemy engine
:param table_name: Table with duplicates
:param use_soft_delete: If True - values will be marked as `deleted`,
if False - values will be removed from table
:param uc_column_names: Unique constraint columns
"""
meta = MetaData()
meta.bind = migrate_engine
table = Table(table_name, meta, autoload=True)
columns_for_group_by = [table.c[name] for name in uc_column_names]
columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by)
duplicated_rows_select = select(columns_for_select,
group_by=columns_for_group_by,
having=func.count(table.c.id) > 1)
for row in migrate_engine.execute(duplicated_rows_select):
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
is_none = None # workaround for pyflakes
delete_condition &= table.c.deleted_at == is_none
for name in uc_column_names:
delete_condition &= table.c[name] == row[name]
rows_to_delete_select = select([table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
LOG.info(_("Deleting duplicated row with id: %(id)s from table: "
"%(table)s") % dict(id=row[0], table=table_name))
if use_soft_delete:
delete_statement = table.update().\
where(delete_condition).\
values({
'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()
})
else:
delete_statement = table.delete().where(delete_condition)
migrate_engine.execute(delete_statement)
def _get_default_deleted_value(table):
if isinstance(table.c.id.type, Integer):
return 0
if isinstance(table.c.id.type, String):
return ""
raise exception.OpenstackException(
message=_("Unsupported id columns type"))
def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
table = get_table(migrate_engine, table_name)
insp = reflection.Inspector.from_engine(migrate_engine)
real_indexes = insp.get_indexes(table_name)
existing_index_names = dict(
[(index['name'], index['column_names']) for index in real_indexes])
# NOTE(boris-42): Restore indexes on `deleted` column
for index in indexes:
if 'deleted' not in index['column_names']:
continue
name = index['name']
if name in existing_index_names:
column_names = [table.c[c] for c in existing_index_names[name]]
old_index = Index(name, *column_names, unique=index["unique"])
old_index.drop(migrate_engine)
column_names = [table.c[c] for c in index['column_names']]
new_index = Index(index["name"], *column_names, unique=index["unique"])
new_index.create(migrate_engine)
def change_deleted_column_type_to_boolean(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_boolean_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
old_deleted = Column('old_deleted', Boolean, default=False)
old_deleted.create(table, populate_default=False)
table.update().\
where(table.c.deleted == table.c.id).\
values(old_deleted=True).\
execute()
table.c.deleted.drop()
table.c.old_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
**col_name_col_instance):
insp = reflection.Inspector.from_engine(migrate_engine)
table = get_table(migrate_engine, table_name)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', Boolean, default=0)
columns.append(column_copy)
constraints = [constraint.copy() for constraint in table.constraints]
meta = MetaData(bind=migrate_engine)
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
c_select = []
for c in table.c:
if c.name != "deleted":
c_select.append(c)
else:
c_select.append(table.c.deleted == table.c.id)
ins = InsertFromSelect(new_table, select(c_select))
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
new_table.update().\
where(new_table.c.deleted == new_table.c.id).\
values(deleted=True).\
execute()
def change_deleted_column_type_to_id_type(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_id_type_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
new_deleted = Column('new_deleted', table.c.id.type,
default=_get_default_deleted_value(table))
new_deleted.create(table, populate_default=True)
deleted = True # workaround for pyflakes
table.update().\
where(table.c.deleted == deleted).\
values(new_deleted=table.c.id).\
execute()
table.c.deleted.drop()
table.c.new_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
**col_name_col_instance):
# NOTE(boris-42): sqlaclhemy-migrate can't drop column with check
# constraints in sqlite DB and our `deleted` column has
# 2 check constraints. So there is only one way to remove
# these constraints:
# 1) Create new table with the same columns, constraints
# and indexes. (except deleted column).
# 2) Copy all data from old to new table.
# 3) Drop old table.
# 4) Rename new table to old table name.
insp = reflection.Inspector.from_engine(migrate_engine)
meta = MetaData(bind=migrate_engine)
table = Table(table_name, meta, autoload=True)
default_deleted_value = _get_default_deleted_value(table)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', table.c.id.type,
default=default_deleted_value)
columns.append(column_copy)
def is_deleted_column_constraint(constraint):
# NOTE(boris-42): There is no other way to check is CheckConstraint
# associated with deleted column.
if not isinstance(constraint, CheckConstraint):
return False
sqltext = str(constraint.sqltext)
return (sqltext.endswith("deleted in (0, 1)") or
sqltext.endswith("deleted IN (:deleted_1, :deleted_2)"))
constraints = []
for constraint in table.constraints:
if not is_deleted_column_constraint(constraint):
constraints.append(constraint.copy())
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
ins = InsertFromSelect(new_table, table.select())
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
deleted = True # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=new_table.c.id).\
execute()
# NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
deleted = False # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=default_deleted_value).\
execute()

View File

@ -18,8 +18,11 @@
from __future__ import print_function from __future__ import print_function
import errno
import gc import gc
import os
import pprint import pprint
import socket
import sys import sys
import traceback import traceback
@ -28,14 +31,34 @@ import eventlet.backdoor
import greenlet import greenlet
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging
help_for_backdoor_port = (
"Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
"in listening on a random tcp port number; <port> results in listening "
"on the specified port number (and not enabling backdoor if that port "
"is in use); and <start>:<end> results in listening on the smallest "
"unused port number within the specified range of port numbers. The "
"chosen port is displayed in the service's log file.")
eventlet_backdoor_opts = [ eventlet_backdoor_opts = [
cfg.IntOpt('backdoor_port', cfg.StrOpt('backdoor_port',
default=None, default=None,
help='port for eventlet backdoor to listen') help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
] ]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(eventlet_backdoor_opts) CONF.register_opts(eventlet_backdoor_opts)
LOG = logging.getLogger(__name__)
class EventletBackdoorConfigValueError(Exception):
def __init__(self, port_range, help_msg, ex):
msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
'%(help)s' %
{'range': port_range, 'ex': ex, 'help': help_msg})
super(EventletBackdoorConfigValueError, self).__init__(msg)
self.port_range = port_range
def _dont_use_this(): def _dont_use_this():
@ -60,6 +83,32 @@ def _print_nativethreads():
print() print()
def _parse_port_range(port_range):
if ':' not in port_range:
start, end = port_range, port_range
else:
start, end = port_range.split(':', 1)
try:
start, end = int(start), int(end)
if end < start:
raise ValueError
return start, end
except ValueError as ex:
raise EventletBackdoorConfigValueError(port_range, ex,
help_for_backdoor_port)
def _listen(host, start_port, end_port, listen_func):
try_port = start_port
while True:
try:
return listen_func((host, try_port))
except socket.error as exc:
if (exc.errno != errno.EADDRINUSE or try_port >= end_port):
raise
try_port += 1
def initialize_if_enabled(): def initialize_if_enabled():
backdoor_locals = { backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process 'exit': _dont_use_this, # So we don't exit the entire process
@ -72,6 +121,8 @@ def initialize_if_enabled():
if CONF.backdoor_port is None: if CONF.backdoor_port is None:
return None return None
start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
# NOTE(johannes): The standard sys.displayhook will print the value of # NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites # the last expression and set it to __builtin__._, which overwrites
# the __builtin__._ that gettext sets. Let's switch to using pprint # the __builtin__._ that gettext sets. Let's switch to using pprint
@ -82,8 +133,13 @@ def initialize_if_enabled():
pprint.pprint(val) pprint.pprint(val)
sys.displayhook = displayhook sys.displayhook = displayhook
sock = eventlet.listen(('localhost', CONF.backdoor_port)) sock = _listen('localhost', start_port, end_port, eventlet.listen)
# In the case of backdoor port being zero, a port number is assigned by
# listen(). In any case, pull the port number out here.
port = sock.getsockname()[1] port = sock.getsockname()[1]
LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
{'port': port, 'pid': os.getpid()})
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals) locals=backdoor_locals)
return port return port

View File

@ -21,7 +21,7 @@ Exceptions common to OpenStack projects
import logging import logging
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
_FATAL_EXCEPTION_FORMAT_ERRORS = False _FATAL_EXCEPTION_FORMAT_ERRORS = False
@ -33,7 +33,7 @@ class Error(Exception):
class ApiError(Error): class ApiError(Error):
def __init__(self, message='Unknown', code='Unknown'): def __init__(self, message='Unknown', code='Unknown'):
self.message = message self.api_message = message
self.code = code self.code = code
super(ApiError, self).__init__('%s: %s' % (code, message)) super(ApiError, self).__init__('%s: %s' % (code, message))
@ -44,19 +44,19 @@ class NotFound(Error):
class UnknownScheme(Error): class UnknownScheme(Error):
msg = "Unknown scheme '%s' found in URI" msg_fmt = "Unknown scheme '%s' found in URI"
def __init__(self, scheme): def __init__(self, scheme):
msg = self.__class__.msg % scheme msg = self.msg_fmt % scheme
super(UnknownScheme, self).__init__(msg) super(UnknownScheme, self).__init__(msg)
class BadStoreUri(Error): class BadStoreUri(Error):
msg = "The Store URI %s was malformed. Reason: %s" msg_fmt = "The Store URI %s was malformed. Reason: %s"
def __init__(self, uri, reason): def __init__(self, uri, reason):
msg = self.__class__.msg % (uri, reason) msg = self.msg_fmt % (uri, reason)
super(BadStoreUri, self).__init__(msg) super(BadStoreUri, self).__init__(msg)
@ -100,9 +100,7 @@ def wrap_exception(f):
return f(*args, **kw) return f(*args, **kw)
except Exception as e: except Exception as e:
if not isinstance(e, Error): if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception(_('Uncaught exception')) logging.exception(_('Uncaught exception'))
#logging.error(traceback.extract_stack(exc_traceback))
raise Error(str(e)) raise Error(str(e))
raise raise
_wrap.func_name = f.func_name _wrap.func_name = f.func_name
@ -113,29 +111,29 @@ class OpenstackException(Exception):
"""Base Exception class. """Base Exception class.
To correctly use this class, inherit from it and define To correctly use this class, inherit from it and define
a 'message' property. That message will get printf'd a 'msg_fmt' property. That message will get printf'd
with the keyword arguments provided to the constructor. with the keyword arguments provided to the constructor.
""" """
message = "An unknown exception occurred" msg_fmt = "An unknown exception occurred"
def __init__(self, **kwargs): def __init__(self, **kwargs):
try: try:
self._error_string = self.message % kwargs self._error_string = self.msg_fmt % kwargs
except Exception as e: except Exception:
if _FATAL_EXCEPTION_FORMAT_ERRORS: if _FATAL_EXCEPTION_FORMAT_ERRORS:
raise e raise
else: else:
# at least get the core message out if something happened # at least get the core message out if something happened
self._error_string = self.message self._error_string = self.msg_fmt
def __str__(self): def __str__(self):
return self._error_string return self._error_string
class MalformedRequestBody(OpenstackException): class MalformedRequestBody(OpenstackException):
message = "Malformed message body: %(reason)s" msg_fmt = "Malformed message body: %(reason)s"
class InvalidContentType(OpenstackException): class InvalidContentType(OpenstackException):
message = "Invalid content type %(content_type)s" msg_fmt = "Invalid content type %(content_type)s"

View File

@ -19,16 +19,15 @@
Exception related utilities. Exception related utilities.
""" """
import contextlib
import logging import logging
import sys import sys
import time
import traceback import traceback
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
@contextlib.contextmanager class save_and_reraise_exception(object):
def save_and_reraise_exception():
"""Save current exception, run some code and then re-raise. """Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None In some cases the exception context can be cleared, resulting in None
@ -40,12 +39,60 @@ def save_and_reraise_exception():
To work around this, we save the exception state, run handler code, and To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is re-raised. saved exception is logged and the new exception is re-raised.
"""
type_, value, tb = sys.exc_info() In some cases the caller may not want to re-raise the exception, and
try: for those circumstances this context provides a reraise flag that
yield can be used to suppress the exception. For example:
except Exception: except Exception:
logging.error(_('Original exception being dropped: %s'), with save_and_reraise_exception() as ctxt:
traceback.format_exception(type_, value, tb)) decide_if_need_reraise()
raise if not should_be_reraised:
raise type_, value, tb ctxt.reraise = False
"""
def __init__(self):
self.reraise = True
def __enter__(self):
self.type_, self.value, self.tb, = sys.exc_info()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
logging.error(_('Original exception being dropped: %s'),
traceback.format_exception(self.type_,
self.value,
self.tb))
return False
if self.reraise:
raise self.type_, self.value, self.tb
def forever_retry_uncaught_exceptions(infunc):
def inner_func(*args, **kwargs):
last_log_time = 0
last_exc_message = None
exc_count = 0
while True:
try:
return infunc(*args, **kwargs)
except Exception as exc:
if exc.message == last_exc_message:
exc_count += 1
else:
exc_count = 1
# Do not log any more frequently than once a minute unless
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
exc.message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = exc.message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.
time.sleep(1)
return inner_func

View File

@ -21,7 +21,7 @@ import errno
import os import os
from muranoapi.openstack.common import excutils from muranoapi.openstack.common import excutils
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)

View File

@ -28,8 +28,11 @@ import copy
import gettext import gettext
import logging.handlers import logging.handlers
import os import os
import re
import UserString import UserString
import six
_localedir = os.environ.get('muranoapi'.upper() + '_LOCALEDIR') _localedir = os.environ.get('muranoapi'.upper() + '_LOCALEDIR')
_t = gettext.translation('muranoapi', localedir=_localedir, fallback=True) _t = gettext.translation('muranoapi', localedir=_localedir, fallback=True)
@ -120,7 +123,29 @@ class Message(UserString.UserString, object):
if self.params is not None: if self.params is not None:
full_msg = full_msg % self.params full_msg = full_msg % self.params
return unicode(full_msg) return six.text_type(full_msg)
def _save_dictionary_parameter(self, dict_param):
full_msg = self.data
# look for %(blah) fields in string;
# ignore %% and deal with the
# case where % is first character on the line
keys = re.findall('(?:[^%]|^)%\((\w*)\)[a-z]', full_msg)
# if we don't find any %(blah) blocks but have a %s
if not keys and re.findall('(?:[^%]|^)%[a-z]', full_msg):
# apparently the full dictionary is the parameter
params = copy.deepcopy(dict_param)
else:
params = {}
for key in keys:
try:
params[key] = copy.deepcopy(dict_param[key])
except TypeError:
# cast uncopyable thing to unicode string
params[key] = unicode(dict_param[key])
return params
def _save_parameters(self, other): def _save_parameters(self, other):
# we check for None later to see if # we check for None later to see if
@ -128,8 +153,16 @@ class Message(UserString.UserString, object):
# so encapsulate if our parameter is actually None # so encapsulate if our parameter is actually None
if other is None: if other is None:
self.params = (other, ) self.params = (other, )
elif isinstance(other, dict):
self.params = self._save_dictionary_parameter(other)
else: else:
self.params = copy.deepcopy(other) # fallback to casting to unicode,
# this will handle the problematic python code-like
# objects that cannot be deep-copied
try:
self.params = copy.deepcopy(other)
except TypeError:
self.params = unicode(other)
return self return self

View File

@ -41,6 +41,7 @@ import json
import types import types
import xmlrpclib import xmlrpclib
import netaddr
import six import six
from muranoapi.openstack.common import timeutils from muranoapi.openstack.common import timeutils
@ -137,6 +138,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# Likely an instance of something. Watch for cycles. # Likely an instance of something. Watch for cycles.
# Ignore class member vars. # Ignore class member vars.
return recursive(value.__dict__, level=level + 1) return recursive(value.__dict__, level=level + 1)
elif isinstance(value, netaddr.IPAddress):
return six.text_type(value)
else: else:
if any(test(value) for test in _nasty_type_tests): if any(test(value) for test in _nasty_type_tests):
return six.text_type(value) return six.text_type(value)

View File

@ -16,11 +16,10 @@
# under the License. # under the License.
import contextlib
import errno import errno
import functools import functools
import os import os
import shutil
import tempfile
import time import time
import weakref import weakref
@ -28,7 +27,7 @@ from eventlet import semaphore
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common import fileutils from muranoapi.openstack.common import fileutils
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import local from muranoapi.openstack.common import local
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
@ -40,8 +39,7 @@ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False, cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'), help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path', cfg.StrOpt('lock_path',
help=('Directory to use for lock files. Default to a ' help=('Directory to use for lock files.'))
'temp directory'))
] ]
@ -135,7 +133,87 @@ else:
_semaphores = weakref.WeakValueDictionary() _semaphores = weakref.WeakValueDictionary()
def synchronized(name, lock_file_prefix, external=False, lock_path=None): @contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `semaphore.Semaphore` instance unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
LOG.info(_('Created lock path: %s'), local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
else:
yield sem
finally:
local.strong_store.locks_held.remove(name)
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
"""Synchronization decorator. """Synchronization decorator.
Decorating a method like so:: Decorating a method like so::
@ -157,99 +235,18 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
... ...
This way only one of either foo or bar can be executing at a time. This way only one of either foo or bar can be executing at a time.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix. The prefix should end with a
hyphen ('-') if specified.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
""" """
def wrap(f): def wrap(f):
@functools.wraps(f) @functools.wraps(f)
def inner(*args, **kwargs): def inner(*args, **kwargs):
# NOTE(soren): If we ever go natively threaded, this will be racy. with lock(name, lock_file_prefix, external, lock_path):
# See http://stackoverflow.com/questions/5390569/dyn LOG.debug(_('Got semaphore / lock "%(function)s"'),
# amically-allocating-and-destroying-mutexes {'function': f.__name__})
sem = _semaphores.get(name, semaphore.Semaphore()) return f(*args, **kwargs)
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem: LOG.debug(_('Semaphore / lock released "%(function)s"'),
LOG.debug(_('Got semaphore "%(lock)s" for method ' {'function': f.__name__})
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" '
'for method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at '
'%(path)s for method '
'"%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at '
'%(path)s for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to
# cleanup the locks left behind by unit
# tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
finally:
local.strong_store.locks_held.remove(name)
return retval
return inner return inner
return wrap return wrap
@ -273,7 +270,7 @@ def synchronized_with_prefix(lock_file_prefix):
... ...
The lock_file_prefix argument is used to provide lock files on disk with a The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified. meaningful prefix.
""" """
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix) return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)

View File

@ -29,8 +29,6 @@ It also allows setting of formatting information through conf.
""" """
import ConfigParser
import cStringIO
import inspect import inspect
import itertools import itertools
import logging import logging
@ -41,8 +39,9 @@ import sys
import traceback import traceback
from oslo.config import cfg from oslo.config import cfg
from six import moves
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import importutils from muranoapi.openstack.common import importutils
from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common import jsonutils
from muranoapi.openstack.common import local from muranoapi.openstack.common import local
@ -74,7 +73,8 @@ logging_cli_opts = [
cfg.StrOpt('log-format', cfg.StrOpt('log-format',
default=None, default=None,
metavar='FORMAT', metavar='FORMAT',
help='A logging.Formatter log message format string which may ' help='DEPRECATED. '
'A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. ' 'use any of the available logging.LogRecord attributes. '
'This option is deprecated. Please use ' 'This option is deprecated. Please use '
'logging_context_format_string and ' 'logging_context_format_string and '
@ -347,7 +347,7 @@ class LogConfigError(Exception):
def _load_log_config(log_config): def _load_log_config(log_config):
try: try:
logging.config.fileConfig(log_config) logging.config.fileConfig(log_config)
except ConfigParser.Error as exc: except moves.configparser.Error as exc:
raise LogConfigError(log_config, str(exc)) raise LogConfigError(log_config, str(exc))
@ -520,7 +520,7 @@ class ContextFormatter(logging.Formatter):
if not record: if not record:
return logging.Formatter.formatException(self, exc_info) return logging.Formatter.formatException(self, exc_info)
stringbuffer = cStringIO.StringIO() stringbuffer = moves.StringIO()
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
None, stringbuffer) None, stringbuffer)
lines = stringbuffer.getvalue().split('\n') lines = stringbuffer.getvalue().split('\n')

View File

@ -22,7 +22,7 @@ import sys
from eventlet import event from eventlet import event
from eventlet import greenthread from eventlet import greenthread
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common import timeutils from muranoapi.openstack.common import timeutils

View File

@ -19,10 +19,7 @@
Network-related utilities and helper functions. Network-related utilities and helper functions.
""" """
from muranoapi.openstack.common import log as logging import urlparse
LOG = logging.getLogger(__name__)
def parse_host_port(address, default_port=None): def parse_host_port(address, default_port=None):
@ -67,3 +64,18 @@ def parse_host_port(address, default_port=None):
port = default_port port = default_port
return (host, None if port is None else int(port)) return (host, None if port is None else int(port))
def urlsplit(url, scheme='', allow_fragments=True):
"""Parse a URL using urlparse.urlsplit(), splitting query and fragments.
This function papers over Python issue9374 when needed.
The parameters are the same as urlparse.urlsplit.
"""
scheme, netloc, path, query, fragment = urlparse.urlsplit(
url, scheme, allow_fragments)
if allow_fragments and '#' in path:
path, fragment = path.split('#', 1)
if '?' in path:
path, query = path.split('?', 1)
return urlparse.SplitResult(scheme, netloc, path, query, fragment)

View File

@ -13,12 +13,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import socket
import uuid import uuid
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common import context from muranoapi.openstack.common import context
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import importutils from muranoapi.openstack.common import importutils
from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common import jsonutils
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
@ -35,7 +36,7 @@ notifier_opts = [
default='INFO', default='INFO',
help='Default notification level for outgoing notifications'), help='Default notification level for outgoing notifications'),
cfg.StrOpt('default_publisher_id', cfg.StrOpt('default_publisher_id',
default='$host', default=None,
help='Default publisher_id for outgoing notifications'), help='Default publisher_id for outgoing notifications'),
] ]
@ -74,7 +75,7 @@ def notify_decorator(name, fn):
ctxt = context.get_context_from_function_and_args(fn, args, kwarg) ctxt = context.get_context_from_function_and_args(fn, args, kwarg)
notify(ctxt, notify(ctxt,
CONF.default_publisher_id, CONF.default_publisher_id or socket.gethostname(),
name, name,
CONF.default_notification_level, CONF.default_notification_level,
body) body)
@ -84,7 +85,10 @@ def notify_decorator(name, fn):
def publisher_id(service, host=None): def publisher_id(service, host=None):
if not host: if not host:
host = CONF.host try:
host = CONF.host
except AttributeError:
host = CONF.default_publisher_id or socket.gethostname()
return "%s.%s" % (service, host) return "%s.%s" % (service, host)
@ -153,29 +157,16 @@ def _get_drivers():
if _drivers is None: if _drivers is None:
_drivers = {} _drivers = {}
for notification_driver in CONF.notification_driver: for notification_driver in CONF.notification_driver:
add_driver(notification_driver) try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
return _drivers.values() return _drivers.values()
def add_driver(notification_driver):
"""Add a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
if isinstance(notification_driver, basestring):
# Load and add
try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
else:
# Driver is already loaded; just add the object.
_drivers[notification_driver] = notification_driver
def _reset_drivers(): def _reset_drivers():
"""Used by unit tests to reset the drivers.""" """Used by unit tests to reset the drivers."""
global _drivers global _drivers

View File

@ -1,29 +0,0 @@
# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from muranoapi.openstack.common.gettextutils import _
from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common.notifier import rpc_notifier
LOG = logging.getLogger(__name__)
def notify(context, message):
"""Deprecated in Grizzly. Please use rpc_notifier instead."""
LOG.deprecated(_("The rabbit_notifier is now deprecated."
" Please use rpc_notifier instead."))
rpc_notifier.notify(context, message)

View File

@ -16,7 +16,7 @@
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common import context as req_context from muranoapi.openstack.common import context as req_context
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common import rpc from muranoapi.openstack.common import rpc

View File

@ -18,7 +18,7 @@
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common import context as req_context from muranoapi.openstack.common import context as req_context
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common import rpc from muranoapi.openstack.common import rpc

View File

@ -29,7 +29,7 @@ import inspect
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import importutils from muranoapi.openstack.common import importutils
from muranoapi.openstack.common import local from muranoapi.openstack.common import local
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging

View File

@ -34,14 +34,28 @@ from eventlet import greenpool
from eventlet import pools from eventlet import pools
from eventlet import queue from eventlet import queue
from eventlet import semaphore from eventlet import semaphore
from oslo.config import cfg
from muranoapi.openstack.common import excutils from muranoapi.openstack.common import excutils
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import local from muranoapi.openstack.common import local
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common.rpc import common as rpc_common from muranoapi.openstack.common.rpc import common as rpc_common
amqp_opts = [
cfg.BoolOpt('amqp_durable_queues',
default=False,
deprecated_name='rabbit_durable_queues',
deprecated_group='DEFAULT',
help='Use durable queues in amqp.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
help='Auto-delete queues in amqp.'),
]
cfg.CONF.register_opts(amqp_opts)
UNIQUE_ID = '_unique_id' UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -151,11 +165,13 @@ class ConnectionContext(rpc_common.Connection):
def create_worker(self, topic, proxy, pool_name): def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name) self.connection.create_worker(topic, proxy, pool_name)
def join_consumer_pool(self, callback, pool_name, topic, exchange_name): def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
ack_on_error=True):
self.connection.join_consumer_pool(callback, self.connection.join_consumer_pool(callback,
pool_name, pool_name,
topic, topic,
exchange_name) exchange_name,
ack_on_error)
def consume_in_thread(self): def consume_in_thread(self):
self.connection.consume_in_thread() self.connection.consume_in_thread()
@ -219,12 +235,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
failure = rpc_common.serialize_remote_exception(failure, failure = rpc_common.serialize_remote_exception(failure,
log_failure) log_failure)
try: msg = {'result': reply, 'failure': failure}
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending: if ending:
msg['ending'] = True msg['ending'] = True
_add_unique_id(msg) _add_unique_id(msg)

View File

@ -24,7 +24,7 @@ import traceback
from oslo.config import cfg from oslo.config import cfg
import six import six
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import importutils from muranoapi.openstack.common import importutils
from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common import jsonutils
from muranoapi.openstack.common import local from muranoapi.openstack.common import local
@ -74,14 +74,14 @@ _REMOTE_POSTFIX = '_Remote'
class RPCException(Exception): class RPCException(Exception):
message = _("An unknown RPC related exception occurred.") msg_fmt = _("An unknown RPC related exception occurred.")
def __init__(self, message=None, **kwargs): def __init__(self, message=None, **kwargs):
self.kwargs = kwargs self.kwargs = kwargs
if not message: if not message:
try: try:
message = self.message % kwargs message = self.msg_fmt % kwargs
except Exception: except Exception:
# kwargs doesn't match a variable in the message # kwargs doesn't match a variable in the message
@ -90,7 +90,7 @@ class RPCException(Exception):
for name, value in kwargs.iteritems(): for name, value in kwargs.iteritems():
LOG.error("%s: %s" % (name, value)) LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened # at least get the core message out if something happened
message = self.message message = self.msg_fmt
super(RPCException, self).__init__(message) super(RPCException, self).__init__(message)
@ -104,7 +104,7 @@ class RemoteError(RPCException):
contains all of the relevant info. contains all of the relevant info.
""" """
message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
def __init__(self, exc_type=None, value=None, traceback=None): def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type self.exc_type = exc_type
@ -121,7 +121,7 @@ class Timeout(RPCException):
This exception is raised if the rpc_response_timeout is reached while This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side. waiting for a response from the remote side.
""" """
message = _('Timeout while waiting on RPC response - ' msg_fmt = _('Timeout while waiting on RPC response - '
'topic: "%(topic)s", RPC method: "%(method)s" ' 'topic: "%(topic)s", RPC method: "%(method)s" '
'info: "%(info)s"') 'info: "%(info)s"')
@ -144,25 +144,25 @@ class Timeout(RPCException):
class DuplicateMessageError(RPCException): class DuplicateMessageError(RPCException):
message = _("Found duplicate message(%(msg_id)s). Skipping it.") msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException): class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.") msg_fmt = _("Invalid reuse of an RPC connection.")
class UnsupportedRpcVersion(RPCException): class UnsupportedRpcVersion(RPCException):
message = _("Specified RPC version, %(version)s, not supported by " msg_fmt = _("Specified RPC version, %(version)s, not supported by "
"this endpoint.") "this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException): class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, " msg_fmt = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.") "not supported by this endpoint.")
class RpcVersionCapError(RPCException): class RpcVersionCapError(RPCException):
message = _("Specified RPC version cap, %(version_cap)s, is too low") msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object): class Connection(object):
@ -261,41 +261,20 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data): def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging.""" """Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': [('args', 'new_pass')], SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE def _fix_passwords(d):
has_context_token = '_context_auth_token' in msg_data """Sanitizes the password fields in the dictionary."""
has_token = 'auth_token' in msg_data for k in d.iterkeys():
if k.lower().find('password') != -1:
d[k] = '<SANITIZED>'
elif k.lower() in SANITIZE:
d[k] = '<SANITIZED>'
elif isinstance(d[k], dict):
_fix_passwords(d[k])
return d
if not any([has_method, has_context_token, has_token]): return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
return log_func(msg, msg_data)
msg_data = copy.deepcopy(msg_data)
if has_method:
for arg in SANITIZE.get(msg_data['method'], []):
try:
d = msg_data
for elem in arg[:-1]:
d = d[elem]
d[arg[-1]] = '<SANITIZED>'
except KeyError as e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
if has_token:
msg_data['auth_token'] = '<SANITIZED>'
return log_func(msg, msg_data)
def serialize_remote_exception(failure_info, log_failure=True): def serialize_remote_exception(failure_info, log_failure=True):

View File

@ -18,7 +18,6 @@ import functools
import itertools import itertools
import socket import socket
import ssl import ssl
import sys
import time import time
import uuid import uuid
@ -30,15 +29,20 @@ import kombu.entity
import kombu.messaging import kombu.messaging
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common import excutils
from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import network_utils from muranoapi.openstack.common import network_utils
from muranoapi.openstack.common.rpc import amqp as rpc_amqp from muranoapi.openstack.common.rpc import amqp as rpc_amqp
from muranoapi.openstack.common.rpc import common as rpc_common from muranoapi.openstack.common.rpc import common as rpc_common
from muranoapi.openstack.common import sslutils
kombu_opts = [ kombu_opts = [
cfg.StrOpt('kombu_ssl_version', cfg.StrOpt('kombu_ssl_version',
default='', default='',
help='SSL version to use (valid only if SSL enabled)'), help='SSL version to use (valid only if SSL enabled). '
'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
'be available on some distributions'
),
cfg.StrOpt('kombu_ssl_keyfile', cfg.StrOpt('kombu_ssl_keyfile',
default='', default='',
help='SSL key file (valid only if SSL enabled)'), help='SSL key file (valid only if SSL enabled)'),
@ -82,9 +86,6 @@ kombu_opts = [
default=0, default=0,
help='maximum retries with trying to connect to RabbitMQ ' help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'), '(the default of 0 implies an infinite retry count)'),
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues', cfg.BoolOpt('rabbit_ha_queues',
default=False, default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).' help='use H/A queues in RabbitMQ (x-ha-policy: all).'
@ -129,6 +130,7 @@ class ConsumerBase(object):
self.tag = str(tag) self.tag = str(tag)
self.kwargs = kwargs self.kwargs = kwargs
self.queue = None self.queue = None
self.ack_on_error = kwargs.get('ack_on_error', True)
self.reconnect(channel) self.reconnect(channel)
def reconnect(self, channel): def reconnect(self, channel):
@ -138,6 +140,36 @@ class ConsumerBase(object):
self.queue = kombu.entity.Queue(**self.kwargs) self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare() self.queue.declare()
def _callback_handler(self, message, callback):
"""Call callback with deserialized message.
Messages that are processed without exception are ack'ed.
If the message processing generates an exception, it will be
ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
Rejection is better than waiting for the message to timeout.
Rejected messages are immediately requeued.
"""
ack_msg = False
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
ack_msg = True
except Exception:
if self.ack_on_error:
ack_msg = True
LOG.exception(_("Failed to process message"
" ... skipping it."))
else:
LOG.exception(_("Failed to process message"
" ... will requeue."))
finally:
if ack_msg:
message.ack()
else:
message.reject()
def consume(self, *args, **kwargs): def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will """Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the start the flow of messages from the queue. Using the
@ -150,8 +182,6 @@ class ConsumerBase(object):
If kwargs['nowait'] is True, then this call will block until If kwargs['nowait'] is True, then this call will block until
a message is read. a message is read.
Messages will automatically be acked if the callback doesn't
raise an exception
""" """
options = {'consumer_tag': self.tag} options = {'consumer_tag': self.tag}
@ -162,13 +192,7 @@ class ConsumerBase(object):
def _callback(raw_message): def _callback(raw_message):
message = self.channel.message_to_python(raw_message) message = self.channel.message_to_python(raw_message)
try: self._callback_handler(message, callback)
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
self.queue.consume(*args, callback=_callback, **options) self.queue.consume(*args, callback=_callback, **options)
@ -233,9 +257,9 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed as keyword arguments Other kombu options may be passed as keyword arguments
""" """
# Default options # Default options
options = {'durable': conf.rabbit_durable_queues, options = {'durable': conf.amqp_durable_queues,
'queue_arguments': _get_queue_arguments(conf), 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False, 'auto_delete': conf.amqp_auto_delete,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
@ -339,8 +363,8 @@ class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults Kombu options may be passed as keyword args to override defaults
""" """
options = {'durable': conf.rabbit_durable_queues, options = {'durable': conf.amqp_durable_queues,
'auto_delete': False, 'auto_delete': conf.amqp_auto_delete,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf) exchange_name = rpc_amqp.get_control_exchange(conf)
@ -370,7 +394,7 @@ class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'.""" """Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs): def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.queue_arguments = _get_queue_arguments(conf) self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
@ -454,7 +478,8 @@ class Connection(object):
# http://docs.python.org/library/ssl.html - ssl.wrap_socket # http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version: if self.conf.kombu_ssl_version:
ssl_params['ssl_version'] = self.conf.kombu_ssl_version ssl_params['ssl_version'] = sslutils.validate_ssl_version(
self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile: if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile: if self.conf.kombu_ssl_certfile:
@ -537,13 +562,11 @@ class Connection(object):
log_info.update(params) log_info.update(params)
if self.max_retries and attempt == self.max_retries: if self.max_retries and attempt == self.max_retries:
LOG.error(_('Unable to connect to AMQP server on ' msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d ' '%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info) 'tries: %(err_str)s') % log_info
# NOTE(comstud): Copied from original code. There's LOG.error(msg)
# really no better recourse because if this was a queue we raise rpc_common.RPCException(msg)
# need to consume on, we have no way to consume anymore.
sys.exit(1)
if attempt == 1: if attempt == 1:
sleep_time = self.interval_start or 1 sleep_time = self.interval_start or 1
@ -635,8 +658,8 @@ class Connection(object):
def _consume(): def _consume():
if info['do_consume']: if info['do_consume']:
queues_head = self.consumers[:-1] queues_head = self.consumers[:-1] # not fanout.
queues_tail = self.consumers[-1] queues_tail = self.consumers[-1] # fanout
for queue in queues_head: for queue in queues_head:
queue.consume(nowait=True) queue.consume(nowait=True)
queues_tail.consume(nowait=False) queues_tail.consume(nowait=False)
@ -685,11 +708,12 @@ class Connection(object):
self.declare_consumer(DirectConsumer, topic, callback) self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None, def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None): exchange_name=None, ack_on_error=True):
"""Create a 'topic' consumer.""" """Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer, self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name, name=queue_name,
exchange_name=exchange_name, exchange_name=exchange_name,
ack_on_error=ack_on_error,
), ),
topic, callback) topic, callback)
@ -724,6 +748,7 @@ class Connection(object):
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread.""" """Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()
@ -754,7 +779,7 @@ class Connection(object):
self.declare_topic_consumer(topic, proxy_cb, pool_name) self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic, def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None): exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from """Register as a member of a group of consumers for a given topic from
the specified exchange. the specified exchange.
@ -775,6 +800,7 @@ class Connection(object):
topic=topic, topic=topic,
exchange_name=exchange_name, exchange_name=exchange_name,
callback=callback_wrapper, callback=callback_wrapper,
ack_on_error=ack_on_error,
) )

View File

@ -24,7 +24,8 @@ import eventlet
import greenlet import greenlet
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common import excutils
from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import importutils from muranoapi.openstack.common import importutils
from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common import jsonutils
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
@ -118,10 +119,17 @@ class ConsumerBase(object):
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session) self.connect(session)
def connect(self, session):
"""Declare the reciever on connect."""
self._declare_receiver(session)
def reconnect(self, session): def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect.""" """Re-declare the receiver after a qpid reconnect."""
self._declare_receiver(session)
def _declare_receiver(self, session):
self.session = session self.session = session
self.receiver = session.receiver(self.address) self.receiver = session.receiver(self.address)
self.receiver.capacity = 1 self.receiver.capacity = 1
@ -152,11 +160,15 @@ class ConsumerBase(object):
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
finally: finally:
# TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message) self.session.acknowledge(message)
def get_receiver(self): def get_receiver(self):
return self.receiver return self.receiver
def get_node_name(self):
return self.address.split(';')[0]
class DirectConsumer(ConsumerBase): class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'.""" """Queue/consumer class for 'direct'."""
@ -169,11 +181,16 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
super(DirectConsumer, self).__init__(session, callback, super(DirectConsumer, self).__init__(
"%s/%s" % (msg_id, msg_id), session, callback,
{"type": "direct"}, "%s/%s" % (msg_id, msg_id),
msg_id, {"type": "direct"},
{"exclusive": True}) msg_id,
{
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
})
class TopicConsumer(ConsumerBase): class TopicConsumer(ConsumerBase):
@ -191,9 +208,14 @@ class TopicConsumer(ConsumerBase):
""" """
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback, super(TopicConsumer, self).__init__(
"%s/%s" % (exchange_name, topic), session, callback,
{}, name or topic, {}) "%s/%s" % (exchange_name, topic),
{}, name or topic,
{
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
})
class FanoutConsumer(ConsumerBase): class FanoutConsumer(ConsumerBase):
@ -206,6 +228,7 @@ class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on 'topic' is the topic to listen on
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
self.conf = conf
super(FanoutConsumer, self).__init__( super(FanoutConsumer, self).__init__(
session, callback, session, callback,
@ -214,6 +237,18 @@ class FanoutConsumer(ConsumerBase):
"%s_fanout_%s" % (topic, uuid.uuid4().hex), "%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True}) {"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}
self.__init__(conf=self.conf, **params)
super(FanoutConsumer, self).reconnect(session)
class Publisher(object): class Publisher(object):
"""Base Publisher class.""" """Base Publisher class."""
@ -575,6 +610,7 @@ class Connection(object):
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread.""" """Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()
@ -615,7 +651,7 @@ class Connection(object):
return consumer return consumer
def join_consumer_pool(self, callback, pool_name, topic, def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None): exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from """Register as a member of a group of consumers for a given topic from
the specified exchange. the specified exchange.

View File

@ -27,7 +27,7 @@ import greenlet
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common import excutils from muranoapi.openstack.common import excutils
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import importutils from muranoapi.openstack.common import importutils
from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common import jsonutils
from muranoapi.openstack.common.rpc import common as rpc_common from muranoapi.openstack.common.rpc import common as rpc_common
@ -358,7 +358,6 @@ class ZmqBaseReactor(ConsumerBase):
def __init__(self, conf): def __init__(self, conf):
super(ZmqBaseReactor, self).__init__() super(ZmqBaseReactor, self).__init__()
self.mapping = {}
self.proxies = {} self.proxies = {}
self.threads = [] self.threads = []
self.sockets = [] self.sockets = []
@ -366,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase):
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
def register(self, proxy, in_addr, zmq_type_in, out_addr=None, def register(self, proxy, in_addr, zmq_type_in,
zmq_type_out=None, in_bind=True, out_bind=True, in_bind=True, subscribe=None):
subscribe=None):
LOG.info(_("Registering reactor")) LOG.info(_("Registering reactor"))
@ -384,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase):
LOG.info(_("In reactor registered")) LOG.info(_("In reactor registered"))
if not out_addr:
return
if zmq_type_out not in (zmq.PUSH, zmq.PUB):
raise RPCException("Bad output socktype")
# Items push out.
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
self.mapping[inq] = outq
self.mapping[outq] = inq
self.sockets.append(outq)
LOG.info(_("Out reactor registered"))
def consume_in_thread(self): def consume_in_thread(self):
def _consume(sock): def _consume(sock):
LOG.info(_("Consuming socket")) LOG.info(_("Consuming socket"))
@ -516,8 +499,7 @@ class ZmqProxy(ZmqBaseReactor):
try: try:
self.register(consumption_proxy, self.register(consumption_proxy,
consume_in, consume_in,
zmq.PULL, zmq.PULL)
out_bind=True)
except zmq.ZMQError: except zmq.ZMQError:
if os.access(ipc_dir, os.X_OK): if os.access(ipc_dir, os.X_OK):
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
@ -559,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying) #TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv() data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
if sock in self.mapping:
LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
'data': data})
self.mapping[sock].send(data)
return
proxy = self.proxies[sock] proxy = self.proxies[sock]

View File

@ -23,7 +23,7 @@ import contextlib
import eventlet import eventlet
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging

View File

@ -23,7 +23,7 @@ import json
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common.rpc import matchmaker as mm from muranoapi.openstack.common.rpc import matchmaker as mm

View File

@ -69,7 +69,7 @@ class RpcProxy(object):
v = vers if vers else self.default_version v = vers if vers else self.default_version
if (self.version_cap and not if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)): rpc_common.version_is_compatible(self.version_cap, v)):
raise rpc_common.RpcVersionCapError(version=self.version_cap) raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
msg['version'] = v msg['version'] = v
def _get_topic(self, topic): def _get_topic(self, topic):

View File

@ -17,7 +17,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common import rpc from muranoapi.openstack.common import rpc
from muranoapi.openstack.common.rpc import dispatcher as rpc_dispatcher from muranoapi.openstack.common.rpc import dispatcher as rpc_dispatcher
@ -32,10 +32,11 @@ class Service(service.Service):
A service enables rpc by listening to queues based on topic and host. A service enables rpc by listening to queues based on topic and host.
""" """
def __init__(self, host, topic, manager=None): def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__() super(Service, self).__init__()
self.host = host self.host = host
self.topic = topic self.topic = topic
self.serializer = serializer
if manager is None: if manager is None:
self.manager = self self.manager = self
else: else:
@ -48,7 +49,8 @@ class Service(service.Service):
LOG.debug(_("Creating Consumer connection for Service %s") % LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic) self.topic)
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
self.serializer)
# Share this same connection for these Consumers # Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False) self.conn.create_consumer(self.topic, dispatcher, fanout=False)

View File

@ -27,11 +27,12 @@ import sys
import time import time
import eventlet import eventlet
from eventlet import event
import logging as std_logging import logging as std_logging
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common import eventlet_backdoor from muranoapi.openstack.common import eventlet_backdoor
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import importutils from muranoapi.openstack.common import importutils
from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common import threadgroup from muranoapi.openstack.common import threadgroup
@ -51,20 +52,9 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services = threadgroup.ThreadGroup() self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled() self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_service(service):
"""Start and wait for a service to finish.
:param service: service to run and wait for.
:returns: None
"""
service.start()
service.wait()
def launch_service(self, service): def launch_service(self, service):
"""Load and start the given service. """Load and start the given service.
@ -73,7 +63,7 @@ class Launcher(object):
""" """
service.backdoor_port = self.backdoor_port service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service) self.services.add(service)
def stop(self): def stop(self):
"""Stop all services which are currently running. """Stop all services which are currently running.
@ -81,7 +71,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services.stop() self.services.stop()
def wait(self): def wait(self):
"""Waits until all services have been stopped, and then returns. """Waits until all services have been stopped, and then returns.
@ -89,7 +79,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services.wait() self.services.wait()
class SignalExit(SystemExit): class SignalExit(SystemExit):
@ -124,9 +114,13 @@ class ServiceLauncher(Launcher):
except SystemExit as exc: except SystemExit as exc:
status = exc.code status = exc.code
finally: finally:
if rpc:
rpc.cleanup()
self.stop() self.stop()
if rpc:
try:
rpc.cleanup()
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.'))
return status return status
@ -189,7 +183,8 @@ class ProcessLauncher(object):
random.seed() random.seed()
launcher = Launcher() launcher = Launcher()
launcher.run_service(service) launcher.launch_service(service)
launcher.wait()
def _start_child(self, wrap): def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers: if len(wrap.forktimes) > wrap.workers:
@ -313,15 +308,63 @@ class Service(object):
def __init__(self, threads=1000): def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads) self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
def start(self): def start(self):
pass pass
def stop(self): def stop(self):
self.tg.stop() self.tg.stop()
self.tg.wait()
# Signal that service cleanup is done:
if not self._done.ready():
self._done.send()
def wait(self):
self._done.wait()
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
def stop(self):
# wait for graceful shutdown of services:
for service in self.services:
service.stop()
service.wait()
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
if not self.done.ready():
self.done.send()
# reap threads:
self.tg.stop()
def wait(self): def wait(self):
self.tg.wait() self.tg.wait()
@staticmethod
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
service.start()
done.wait()
def launch(service, workers=None): def launch(service, workers=None):
if workers: if workers:

View File

@ -1,367 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utilities with minimum-depends for use in setup.py
"""
import email
import os
import re
import subprocess
import sys
from setuptools.command import sdist
def parse_mailmap(mailmap='.mailmap'):
mapping = {}
if os.path.exists(mailmap):
with open(mailmap, 'r') as fp:
for l in fp:
try:
canonical_email, alias = re.match(
r'[^#]*?(<.+>).*(<.+>).*', l).groups()
except AttributeError:
continue
mapping[alias] = canonical_email
return mapping
def _parse_git_mailmap(git_dir, mailmap='.mailmap'):
mailmap = os.path.join(os.path.dirname(git_dir), mailmap)
return parse_mailmap(mailmap)
def canonicalize_emails(changelog, mapping):
"""Takes in a string and an email alias mapping and replaces all
instances of the aliases in the string with their real email.
"""
for alias, email_address in mapping.iteritems():
changelog = changelog.replace(alias, email_address)
return changelog
# Get requirements from the first file that exists
def get_reqs_from_files(requirements_files):
for requirements_file in requirements_files:
if os.path.exists(requirements_file):
with open(requirements_file, 'r') as fil:
return fil.read().split('\n')
return []
def parse_requirements(requirements_files=['requirements.txt',
'tools/pip-requires']):
requirements = []
for line in get_reqs_from_files(requirements_files):
# For the requirements list, we need to inject only the portion
# after egg= so that distutils knows the package it's looking for
# such as:
# -e git://github.com/openstack/nova/master#egg=nova
if re.match(r'\s*-e\s+', line):
requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
line))
# such as:
# http://github.com/openstack/nova/zipball/master#egg=nova
elif re.match(r'\s*https?:', line):
requirements.append(re.sub(r'\s*https?:.*#egg=(.*)$', r'\1',
line))
# -f lines are for index locations, and don't get used here
elif re.match(r'\s*-f\s+', line):
pass
# argparse is part of the standard library starting with 2.7
# adding it to the requirements list screws distro installs
elif line == 'argparse' and sys.version_info >= (2, 7):
pass
else:
requirements.append(line)
return requirements
def parse_dependency_links(requirements_files=['requirements.txt',
'tools/pip-requires']):
dependency_links = []
# dependency_links inject alternate locations to find packages listed
# in requirements
for line in get_reqs_from_files(requirements_files):
# skip comments and blank lines
if re.match(r'(\s*#)|(\s*$)', line):
continue
# lines with -e or -f need the whole line, minus the flag
if re.match(r'\s*-[ef]\s+', line):
dependency_links.append(re.sub(r'\s*-[ef]\s+', '', line))
# lines that are only urls can go in unmolested
elif re.match(r'\s*https?:', line):
dependency_links.append(line)
return dependency_links
def _run_shell_command(cmd, throw_on_error=False):
if os.name == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
else:
output = subprocess.Popen(["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out = output.communicate()
if output.returncode and throw_on_error:
raise Exception("%s returned %d" % cmd, output.returncode)
if len(out) == 0:
return None
if len(out[0].strip()) == 0:
return None
return out[0].strip()
def _get_git_directory():
parent_dir = os.path.dirname(__file__)
while True:
git_dir = os.path.join(parent_dir, '.git')
if os.path.exists(git_dir):
return git_dir
parent_dir, child = os.path.split(parent_dir)
if not child: # reached to root dir
return None
def write_git_changelog():
"""Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog'
git_dir = _get_git_directory()
if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'):
if git_dir:
git_log_cmd = 'git --git-dir=%s log' % git_dir
changelog = _run_shell_command(git_log_cmd)
mailmap = _parse_git_mailmap(git_dir)
with open(new_changelog, "w") as changelog_file:
changelog_file.write(canonicalize_emails(changelog, mailmap))
else:
open(new_changelog, 'w').close()
def generate_authors():
"""Create AUTHORS file using git commits."""
jenkins_email = 'jenkins@review.(openstack|stackforge).org'
old_authors = 'AUTHORS.in'
new_authors = 'AUTHORS'
git_dir = _get_git_directory()
if not os.getenv('SKIP_GENERATE_AUTHORS'):
if git_dir:
# don't include jenkins email address in AUTHORS file
git_log_cmd = ("git --git-dir=" + git_dir +
" log --format='%aN <%aE>' | sort -u | "
"egrep -v '" + jenkins_email + "'")
changelog = _run_shell_command(git_log_cmd)
signed_cmd = ("git log --git-dir=" + git_dir +
" | grep -i Co-authored-by: | sort -u")
signed_entries = _run_shell_command(signed_cmd)
if signed_entries:
new_entries = "\n".join(
[signed.split(":", 1)[1].strip()
for signed in signed_entries.split("\n") if signed])
changelog = "\n".join((changelog, new_entries))
mailmap = _parse_git_mailmap(git_dir)
with open(new_authors, 'w') as new_authors_fh:
new_authors_fh.write(canonicalize_emails(changelog, mailmap))
if os.path.exists(old_authors):
with open(old_authors, "r") as old_authors_fh:
new_authors_fh.write('\n' + old_authors_fh.read())
else:
open(new_authors, 'w').close()
_rst_template = """%(heading)s
%(underline)s
.. automodule:: %(module)s
:members:
:undoc-members:
:show-inheritance:
"""
def get_cmdclass():
"""Return dict of commands to run from setup.py."""
cmdclass = dict()
def _find_modules(arg, dirname, files):
for filename in files:
if filename.endswith('.py') and filename != '__init__.py':
arg["%s.%s" % (dirname.replace('/', '.'),
filename[:-3])] = True
class LocalSDist(sdist.sdist):
"""Builds the ChangeLog and Authors files from VC first."""
def run(self):
write_git_changelog()
generate_authors()
# sdist.sdist is an old style class, can't use super()
sdist.sdist.run(self)
cmdclass['sdist'] = LocalSDist
# If Sphinx is installed on the box running setup.py,
# enable setup.py to build the documentation, otherwise,
# just ignore it
try:
from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc):
builders = ['html', 'man']
def generate_autoindex(self):
print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
modules = {}
option_dict = self.distribution.get_option_dict('build_sphinx')
source_dir = os.path.join(option_dict['source_dir'][1], 'api')
if not os.path.exists(source_dir):
os.makedirs(source_dir)
for pkg in self.distribution.packages:
if '.' not in pkg:
os.path.walk(pkg, _find_modules, modules)
module_list = modules.keys()
module_list.sort()
autoindex_filename = os.path.join(source_dir, 'autoindex.rst')
with open(autoindex_filename, 'w') as autoindex:
autoindex.write(""".. toctree::
:maxdepth: 1
""")
for module in module_list:
output_filename = os.path.join(source_dir,
"%s.rst" % module)
heading = "The :mod:`%s` Module" % module
underline = "=" * len(heading)
values = dict(module=module, heading=heading,
underline=underline)
print "Generating %s" % output_filename
with open(output_filename, 'w') as output_file:
output_file.write(_rst_template % values)
autoindex.write(" %s.rst\n" % module)
def run(self):
if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex()
for builder in self.builders:
self.builder = builder
self.finalize_options()
self.project = self.distribution.get_name()
self.version = self.distribution.get_version()
self.release = self.distribution.get_version()
BuildDoc.run(self)
class LocalBuildLatex(LocalBuildDoc):
builders = ['latex']
cmdclass['build_sphinx'] = LocalBuildDoc
cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError:
pass
return cmdclass
def _get_revno(git_dir):
"""Return the number of commits since the most recent tag.
We use git-describe to find this out, but if there are no
tags then we fall back to counting commits since the beginning
of time.
"""
describe = _run_shell_command(
"git --git-dir=%s describe --always" % git_dir)
if "-" in describe:
return describe.rsplit("-", 2)[-2]
# no tags found
revlist = _run_shell_command(
"git --git-dir=%s rev-list --abbrev-commit HEAD" % git_dir)
return len(revlist.splitlines())
def _get_version_from_git(pre_version):
"""Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions
if the current revision has no tag."""
git_dir = _get_git_directory()
if git_dir:
if pre_version:
try:
return _run_shell_command(
"git --git-dir=" + git_dir + " describe --exact-match",
throw_on_error=True).replace('-', '.')
except Exception:
sha = _run_shell_command(
"git --git-dir=" + git_dir + " log -n1 --pretty=format:%h")
return "%s.a%s.g%s" % (pre_version, _get_revno(git_dir), sha)
else:
return _run_shell_command(
"git --git-dir=" + git_dir + " describe --always").replace(
'-', '.')
return None
def _get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can."""
try:
pkg_info_file = open('PKG-INFO', 'r')
except (IOError, OSError):
return None
try:
pkg_info = email.message_from_file(pkg_info_file)
except email.MessageError:
return None
# Check to make sure we're in our own dir
if pkg_info.get('Name', None) != package_name:
return None
return pkg_info.get('Version', None)
def get_version(package_name, pre_version=None):
"""Get the version of the project. First, try getting it from PKG-INFO, if
it exists. If it does, that means we're in a distribution tarball or that
install has happened. Otherwise, if there is no PKG-INFO file, pull the
version from git.
We do not support setup.py version sanity in git archive tarballs, nor do
we support packagers directly sucking our git repo into theirs. We expect
that a source tarball be made from our git repo - or that if someone wants
to make a source tarball from a fork of our repo with additional tags in it
that they understand and desire the results of doing that.
"""
version = os.environ.get("OSLO_PACKAGE_VERSION", None)
if version:
return version
version = _get_version_from_pkg_info(package_name)
if version:
return version
version = _get_version_from_git(pre_version)
if version:
return version
raise Exception("Versioning for this project requires either an sdist"
" tarball, or access to an upstream git repository.")

View File

@ -19,7 +19,7 @@ import ssl
from oslo.config import cfg from oslo.config import cfg
from muranoapi.openstack.common.gettextutils import _ from muranoapi.openstack.common.gettextutils import _ # noqa
ssl_opts = [ ssl_opts = [
@ -78,3 +78,23 @@ def wrap(sock):
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
return ssl.wrap_socket(sock, **ssl_kwargs) return ssl.wrap_socket(sock, **ssl_kwargs)
_SSL_PROTOCOLS = {
"tlsv1": ssl.PROTOCOL_TLSv1,
"sslv23": ssl.PROTOCOL_SSLv23,
"sslv3": ssl.PROTOCOL_SSLv3
}
try:
_SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
except AttributeError:
pass
def validate_ssl_version(version):
key = version.lower()
try:
return _SSL_PROTOCOLS[key]
except KeyError:
raise RuntimeError(_("Invalid SSL version : %s") % version)

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from eventlet import greenlet import eventlet
from eventlet import greenpool from eventlet import greenpool
from eventlet import greenthread from eventlet import greenthread
@ -105,7 +105,7 @@ class ThreadGroup(object):
for x in self.timers: for x in self.timers:
try: try:
x.wait() x.wait()
except greenlet.GreenletExit: except eventlet.greenlet.GreenletExit:
pass pass
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)
@ -115,7 +115,7 @@ class ThreadGroup(object):
continue continue
try: try:
x.wait() x.wait()
except greenlet.GreenletExit: except eventlet.greenlet.GreenletExit:
pass pass
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)

View File

@ -23,6 +23,7 @@ import calendar
import datetime import datetime
import iso8601 import iso8601
import six
# ISO 8601 extended time format with microseconds # ISO 8601 extended time format with microseconds
@ -75,14 +76,14 @@ def normalize_time(timestamp):
def is_older_than(before, seconds): def is_older_than(before, seconds):
"""Return True if before is older than seconds.""" """Return True if before is older than seconds."""
if isinstance(before, basestring): if isinstance(before, six.string_types):
before = parse_strtime(before).replace(tzinfo=None) before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds) return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds): def is_newer_than(after, seconds):
"""Return True if after is newer than seconds.""" """Return True if after is newer than seconds."""
if isinstance(after, basestring): if isinstance(after, six.string_types):
after = parse_strtime(after).replace(tzinfo=None) after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds) return after - utcnow() > datetime.timedelta(seconds=seconds)

View File

@ -1,94 +0,0 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utilities for consuming the version from pkg_resources.
"""
import pkg_resources
class VersionInfo(object):
def __init__(self, package):
"""Object that understands versioning for a package
:param package: name of the python package, such as glance, or
python-glanceclient
"""
self.package = package
self.release = None
self.version = None
self._cached_version = None
def __str__(self):
"""Make the VersionInfo object behave like a string."""
return self.version_string()
def __repr__(self):
"""Include the name."""
return "VersionInfo(%s:%s)" % (self.package, self.version_string())
def _get_version_from_pkg_resources(self):
"""Get the version of the package from the pkg_resources record
associated with the package."""
try:
requirement = pkg_resources.Requirement.parse(self.package)
provider = pkg_resources.get_provider(requirement)
return provider.version
except pkg_resources.DistributionNotFound:
# The most likely cause for this is running tests in a tree
# produced from a tarball where the package itself has not been
# installed into anything. Revert to setup-time logic.
from muranoapi.openstack.common import setup
return setup.get_version(self.package)
def release_string(self):
"""Return the full version of the package including suffixes indicating
VCS status.
"""
if self.release is None:
self.release = self._get_version_from_pkg_resources()
return self.release
def version_string(self):
"""Return the short version minus any alpha/beta tags."""
if self.version is None:
parts = []
for part in self.release_string().split('.'):
if part[0].isdigit():
parts.append(part)
else:
break
self.version = ".".join(parts)
return self.version
# Compatibility functions
canonical_version_string = version_string
version_string_with_vcs = release_string
def cached_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested
"""
if not self._cached_version:
self._cached_version = "%s%s" % (prefix,
self.version_string())
return self._cached_version

View File

@ -1,7 +1,6 @@
[DEFAULT] [DEFAULT]
# The list of modules to copy from openstack-common # The list of modules to copy from openstack-common
module=setup
module=wsgi module=wsgi
module=config module=config
module=exception module=exception
@ -15,7 +14,6 @@ module=service
module=notifier module=notifier
module=local module=local
module=install_venv_common module=install_venv_common
module=version
module=timeutils module=timeutils
module=eventlet_backdoor module=eventlet_backdoor
module=threadgroup module=threadgroup

View File

@ -1,3 +1,5 @@
d2to1>=0.2.10,<0.3
pbr>=0.5,<0.6
Babel Babel
SQLAlchemy>=0.7,<=0.7.9 SQLAlchemy>=0.7,<=0.7.9
anyjson anyjson
@ -13,8 +15,8 @@ httplib2
kombu kombu
pycrypto>=2.1.0alpha1 pycrypto>=2.1.0alpha1
iso8601>=0.1.4 iso8601>=0.1.4
amqplib
six six
netaddr
# Note you will need gcc buildtools installed and must # Note you will need gcc buildtools installed and must
# have installed libxml headers for lxml to be successfully # have installed libxml headers for lxml to be successfully
@ -28,5 +30,5 @@ Paste
passlib passlib
jsonschema==2.0.0 jsonschema==2.0.0
python-keystoneclient>=0.2.0 python-keystoneclient>=0.2.0
oslo.config oslo.config
http://github.com/sergmelikyan/murano-common/releases/download/0.1/muranocommon-0.1.tar.gz#egg=muranocommon-0.1

View File

@ -1,3 +1,48 @@
# Copyright (c) 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
[metadata]
name = muranoapi
summary = Murano API
description-file =
README.rst
license = Apache License, Version 2.0
author = Mirantis, Inc.
author-email = murano-all@lists.openstack.org
home-page = htts://launchpad.net/murano
classifier =
Development Status :: 5 - Production/Stable
Environment :: OpenStack
Intended Audience :: Developers
Intended Audience :: Information Technology
License :: OSI Approved :: Apache Software License
Operating System :: OS Independent
Programming Language :: Python
[files]
packages =
muranoapi
[global]
setup-hooks =
pbr.hooks.setup_hook
[entry_points]
console_scripts =
murano-api = muranoapi.cmd.api:main
[build_sphinx] [build_sphinx]
all_files = 1 all_files = 1
build-dir = doc/build build-dir = doc/build
@ -21,13 +66,3 @@ input_file = muranoapi/locale/muranoapi.pot
keywords = _ gettext ngettext l_ lazy_gettext keywords = _ gettext ngettext l_ lazy_gettext
mapping_file = babel.cfg mapping_file = babel.cfg
output_file = muranoapi/locale/muranoapi.pot output_file = muranoapi/locale/muranoapi.pot
[nosetests]
# NOTE(jkoelker) To run the test suite under nose install the following
# coverage http://pypi.python.org/pypi/coverage
# tissue http://pypi.python.org/pypi/tissue (pep8 checker)
# openstack-nose https://github.com/jkoelker/openstack-nose
verbosity=2
cover-package = muranoapi
cover-html = true
cover-erase = true

View File

@ -16,34 +16,11 @@
import setuptools import setuptools
from muranoapi.openstack.common import setup
requires = setup.parse_requirements()
depend_links = setup.parse_dependency_links()
project = 'muranoapi'
setuptools.setup( setuptools.setup(
name=project, setup_requires=[
version=setup.get_version(project, '2013.1'), 'd2to1>=0.2.10,<0.3',
description='The Murano Project API', 'pbr>=0.5,<0.6'
license='Apache License (2.0)',
author='Mirantis, Inc',
author_email='smelikyan@mirantis.com',
url='http://muranoapi.mirantis.com/',
packages=setuptools.find_packages(exclude=['bin']),
test_suite='nose.collector',
cmdclass=setup.get_cmdclass(),
include_package_data=True,
install_requires=requires,
dependency_links=depend_links,
classifiers=[
'Development Status :: 4 - Beta',
'License :: OSI Approved :: Apache Software License',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 2.7',
'Environment :: No Input/Output (Daemon)',
'Environment :: OpenStack',
], ],
scripts=['bin/murano-api'], d2to1=True,
py_modules=[]
) )

69
tools/config/generate_sample.sh Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/env bash
print_hint() {
echo "Try \`${0##*/} --help' for more information." >&2
}
PARSED_OPTIONS=$(getopt -n "${0##*/}" -o hb:p:o: \
--long help,base-dir:,package-name:,output-dir: -- "$@")
if [ $? != 0 ] ; then print_hint ; exit 1 ; fi
eval set -- "$PARSED_OPTIONS"
while true; do
case "$1" in
-h|--help)
echo "${0##*/} [options]"
echo ""
echo "options:"
echo "-h, --help show brief help"
echo "-b, --base-dir=DIR Project base directory (required)"
echo "-p, --package-name=NAME Project package name"
echo "-o, --output-dir=DIR File output directory"
exit 0
;;
-b|--base-dir)
shift
BASEDIR=`echo $1 | sed -e 's/\/*$//g'`
shift
;;
-p|--package-name)
shift
PACKAGENAME=`echo $1`
shift
;;
-o|--output-dir)
shift
OUTPUTDIR=`echo $1 | sed -e 's/\/*$//g'`
shift
;;
--)
break
;;
esac
done
if [ -z $BASEDIR ] || ! [ -d $BASEDIR ]
then
echo "${0##*/}: missing project base directory" >&2 ; print_hint ; exit 1
fi
PACKAGENAME=${PACKAGENAME:-${BASEDIR##*/}}
OUTPUTDIR=${OUTPUTDIR:-$BASEDIR/etc}
if ! [ -d $OUTPUTDIR ]
then
echo "${0##*/}: cannot access \`$OUTPUTDIR': No such file or directory" >&2
exit 1
fi
BASEDIRESC=`echo $BASEDIR | sed -e 's/\//\\\\\//g'`
FILES=$(find $BASEDIR/$PACKAGENAME -type f -name "*.py" ! -path "*/tests/*" \
-exec grep -l "Opt(" {} + | sed -e "s/^$BASEDIRESC\///g" | sort -u)
export EVENTLET_NO_GREENDNS=yes
MODULEPATH=muranoapi.openstack.common.config.generator
OUTPUTFILE=$OUTPUTDIR/$PACKAGENAME.conf.sample
python -m $MODULEPATH $FILES > $OUTPUTFILE

View File

@ -4,72 +4,74 @@
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved. # All Rights Reserved.
# #
# Copyright 2010 OpenStack LLC. # Copyright 2010 OpenStack Foundation
# Copyright 2013 IBM Corp. # Copyright 2013 IBM Corp.
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""
Installation script for Murano API's development virtualenv
"""
import ConfigParser
import os import os
import subprocess
import sys import sys
import install_venv_common as install_venv import install_venv_common as install_venv # flake8: noqa
def print_help(): def print_help(project, venv, root):
help = """ help = """
Murano API development environment setup is complete. %(project)s development environment setup is complete.
Murano API development uses virtualenv to track and manage Python dependencies %(project)s development uses virtualenv to track and manage Python
while in development and testing. dependencies while in development and testing.
To activate the Murano API virtualenv for the extent of your current shell session To activate the %(project)s virtualenv for the extent of your current
you can run: shell session you can run:
$ source .venv/bin/activate $ source %(venv)s/bin/activate
Or, if you prefer, you can run commands in the virtualenv on a case by case Or, if you prefer, you can run commands in the virtualenv on a case by
basis by running: case basis by running:
$ tools/with_venv.sh <your command> $ %(root)s/tools/with_venv.sh <your command>
Also, make test will automatically use the virtualenv.
""" """
print help print help % dict(project=project, venv=venv, root=root)
def main(argv): def main(argv):
root = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) root = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
if os.environ.get('tools_path'):
root = os.environ['tools_path']
venv = os.path.join(root, '.venv') venv = os.path.join(root, '.venv')
pip_requires = os.path.join(root, 'tools', 'pip-requires') if os.environ.get('venv'):
test_requires = os.path.join(root, 'tools', 'test-requires') venv = os.environ['venv']
pip_requires = os.path.join(root, 'requirements.txt')
test_requires = os.path.join(root, 'test-requirements.txt')
py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1]) py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1])
project = 'muranoapi' setup_cfg = ConfigParser.ConfigParser()
install = install_venv.InstallVenv(root, venv, pip_requires, test_requires, setup_cfg.read('setup.cfg')
py_version, project) project = setup_cfg.get('metadata', 'name')
install = install_venv.InstallVenv(
root, venv, pip_requires, test_requires, py_version, project)
options = install.parse_args(argv) options = install.parse_args(argv)
install.check_python_version() install.check_python_version()
install.check_dependencies() install.check_dependencies()
install.create_virtualenv(no_site_packages=options.no_site_packages) install.create_virtualenv(no_site_packages=options.no_site_packages)
install.install_dependencies() install.install_dependencies()
install.run_command([os.path.join(venv, 'bin/python'),
'setup.py', 'develop'])
install.post_process() install.post_process()
print_help() print_help(project, venv, root)
if __name__ == '__main__': if __name__ == '__main__':
main(sys.argv) main(sys.argv)

View File

@ -34,12 +34,13 @@ import sys
class InstallVenv(object): class InstallVenv(object):
def __init__(self, root, venv, pip_requires, test_requires, py_version, def __init__(self, root, venv, requirements,
test_requirements, py_version,
project): project):
self.root = root self.root = root
self.venv = venv self.venv = venv
self.pip_requires = pip_requires self.requirements = requirements
self.test_requires = test_requires self.test_requirements = test_requirements
self.py_version = py_version self.py_version = py_version
self.project = project self.project = project
@ -75,11 +76,13 @@ class InstallVenv(object):
def get_distro(self): def get_distro(self):
if (os.path.exists('/etc/fedora-release') or if (os.path.exists('/etc/fedora-release') or
os.path.exists('/etc/redhat-release')): os.path.exists('/etc/redhat-release')):
return Fedora(self.root, self.venv, self.pip_requires, return Fedora(
self.test_requires, self.py_version, self.project) self.root, self.venv, self.requirements,
self.test_requirements, self.py_version, self.project)
else: else:
return Distro(self.root, self.venv, self.pip_requires, return Distro(
self.test_requires, self.py_version, self.project) self.root, self.venv, self.requirements,
self.test_requirements, self.py_version, self.project)
def check_dependencies(self): def check_dependencies(self):
self.get_distro().install_virtualenv() self.get_distro().install_virtualenv()
@ -98,11 +101,6 @@ class InstallVenv(object):
else: else:
self.run_command(['virtualenv', '-q', self.venv]) self.run_command(['virtualenv', '-q', self.venv])
print('done.') print('done.')
print('Installing pip in venv...', end=' ')
if not self.run_command(['tools/with_venv.sh', 'easy_install',
'pip>1.0']).strip():
self.die("Failed to install pip.")
print('done.')
else: else:
print("venv already exists...") print("venv already exists...")
pass pass
@ -116,20 +114,12 @@ class InstallVenv(object):
print('Installing dependencies with pip (this can take a while)...') print('Installing dependencies with pip (this can take a while)...')
# First things first, make sure our venv has the latest pip and # First things first, make sure our venv has the latest pip and
# distribute. # setuptools.
# NOTE: we keep pip at version 1.1 since the most recent version causes self.pip_install('pip>=1.3')
# the .venv creation to fail. See: self.pip_install('setuptools')
# https://bugs.launchpad.net/nova/+bug/1047120
self.pip_install('pip==1.1')
self.pip_install('distribute')
# Install greenlet by hand - just listing it in the requires file does self.pip_install('-r', self.requirements)
# not self.pip_install('-r', self.test_requirements)
# get it installed in the right order
self.pip_install('greenlet')
self.pip_install('-r', self.pip_requires)
self.pip_install('-r', self.test_requires)
def post_process(self): def post_process(self):
self.get_distro().post_process() self.get_distro().post_process()

View File

@ -8,8 +8,8 @@ setenv = VIRTUAL_ENV={envdir}
NOSE_OPENSTACK_RED=0.05 NOSE_OPENSTACK_RED=0.05
NOSE_OPENSTACK_YELLOW=0.025 NOSE_OPENSTACK_YELLOW=0.025
NOSE_OPENSTACK_SHOW_ELAPSED=1 NOSE_OPENSTACK_SHOW_ELAPSED=1
deps = -r{toxinidir}/tools/pip-requires deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/tools/test-requires -r{toxinidir}/test-requirements.txt
commands = nosetests commands = nosetests
[testenv:pep8] [testenv:pep8]