From 072b593de6f0fb007ecd699b3d1da950cbcef0f6 Mon Sep 17 00:00:00 2001 From: Serg Melikyan Date: Tue, 23 Jul 2013 14:29:18 +0400 Subject: [PATCH] Migrated to Murano Common Replaced old RabbitMQ code over amqplib to Murano Common MqClient Change-Id: Icf721f9304f65df02231bde77c8472fd6e20c90d --- etc/murano-api-paste.ini | 2 +- muranoapi/__init__.py | 4 + muranoapi/api/v1/environments.py | 4 +- muranoapi/{version.py => cmd/__init__.py} | 4 - bin/murano-api => muranoapi/cmd/api.py | 15 +- muranoapi/common/config.py | 7 +- muranoapi/common/service.py | 94 ++--- muranoapi/db/services/environments.py | 31 +- muranoapi/db/services/sessions.py | 30 +- .../openstack/common/config/generator.py | 53 +-- muranoapi/openstack/common/context.py | 2 +- muranoapi/openstack/common/db/exception.py | 2 +- .../common/db/sqlalchemy/migration.py | 159 ++++++++ .../openstack/common/db/sqlalchemy/models.py | 12 +- .../openstack/common/db/sqlalchemy/session.py | 23 +- .../openstack/common/db/sqlalchemy/utils.py | 342 +++++++++++++++- .../openstack/common/eventlet_backdoor.py | 62 ++- muranoapi/openstack/common/exception.py | 30 +- muranoapi/openstack/common/excutils.py | 71 +++- muranoapi/openstack/common/fileutils.py | 2 +- muranoapi/openstack/common/gettextutils.py | 37 +- muranoapi/openstack/common/jsonutils.py | 3 + muranoapi/openstack/common/lockutils.py | 185 +++++---- muranoapi/openstack/common/log.py | 12 +- muranoapi/openstack/common/loopingcall.py | 2 +- muranoapi/openstack/common/network_utils.py | 20 +- muranoapi/openstack/common/notifier/api.py | 39 +- .../common/notifier/rabbit_notifier.py | 29 -- .../openstack/common/notifier/rpc_notifier.py | 2 +- .../common/notifier/rpc_notifier2.py | 2 +- muranoapi/openstack/common/rpc/__init__.py | 2 +- muranoapi/openstack/common/rpc/amqp.py | 29 +- muranoapi/openstack/common/rpc/common.py | 67 ++-- muranoapi/openstack/common/rpc/impl_kombu.py | 90 +++-- muranoapi/openstack/common/rpc/impl_qpid.py | 58 ++- muranoapi/openstack/common/rpc/impl_zmq.py | 31 +- muranoapi/openstack/common/rpc/matchmaker.py | 2 +- .../openstack/common/rpc/matchmaker_ring.py | 2 +- muranoapi/openstack/common/rpc/proxy.py | 2 +- muranoapi/openstack/common/rpc/service.py | 8 +- muranoapi/openstack/common/service.py | 81 +++- muranoapi/openstack/common/setup.py | 367 ------------------ muranoapi/openstack/common/sslutils.py | 22 +- muranoapi/openstack/common/threadgroup.py | 6 +- muranoapi/openstack/common/timeutils.py | 5 +- muranoapi/openstack/common/version.py | 94 ----- openstack-common.conf | 2 - tools/pip-requires => requirements.txt | 6 +- setup.cfg | 57 ++- setup.py | 31 +- tools/test-requires => test-requirements.txt | 2 +- tools/config/generate_sample.sh | 69 ++++ tools/install_venv.py | 76 ++-- tools/install_venv_common.py | 40 +- tox.ini | 4 +- 55 files changed, 1342 insertions(+), 1091 deletions(-) rename muranoapi/{version.py => cmd/__init__.py} (83%) rename bin/murano-api => muranoapi/cmd/api.py (74%) mode change 100755 => 100644 muranoapi/openstack/common/config/generator.py create mode 100644 muranoapi/openstack/common/db/sqlalchemy/migration.py mode change 100644 => 100755 muranoapi/openstack/common/db/sqlalchemy/utils.py delete mode 100644 muranoapi/openstack/common/notifier/rabbit_notifier.py delete mode 100644 muranoapi/openstack/common/setup.py delete mode 100644 muranoapi/openstack/common/version.py rename tools/pip-requires => requirements.txt (78%) rename tools/test-requires => test-requirements.txt (94%) create mode 100755 tools/config/generate_sample.sh diff --git a/etc/murano-api-paste.ini b/etc/murano-api-paste.ini index dc839b19..233c022d 100644 --- a/etc/murano-api-paste.ini +++ b/etc/murano-api-paste.ini @@ -1,4 +1,4 @@ -[pipeline:murano-api] +[pipeline:api.py] pipeline = authtoken context apiv1app [app:apiv1app] diff --git a/muranoapi/__init__.py b/muranoapi/__init__.py index eed88220..789694c8 100644 --- a/muranoapi/__init__.py +++ b/muranoapi/__init__.py @@ -14,3 +14,7 @@ import gettext gettext.install('muranoapi', './muranoapi/locale', unicode=1) + +from pbr import version +__version_info = version.VersionInfo('muranoapi') +__version__ = __version_info.cached_version_string() diff --git a/muranoapi/api/v1/environments.py b/muranoapi/api/v1/environments.py index 7f1f6467..ac06b15c 100644 --- a/muranoapi/api/v1/environments.py +++ b/muranoapi/api/v1/environments.py @@ -12,11 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet -from muranoapi.common.utils import build_entity_map from sqlalchemy import desc from webob import exc from muranoapi.common import config +from muranoapi.common.utils import build_entity_map from muranoapi.db.session import get_session from muranoapi.db.models import Environment, Status from muranoapi.db.services.core_services import CoreServices @@ -24,7 +23,6 @@ from muranoapi.db.services.environments import EnvironmentServices from muranoapi.openstack.common import wsgi from muranoapi.openstack.common import log as logging -amqp = eventlet.patcher.import_patched('amqplib.client_0_8') rabbitmq = config.CONF.rabbitmq log = logging.getLogger(__name__) diff --git a/muranoapi/version.py b/muranoapi/cmd/__init__.py similarity index 83% rename from muranoapi/version.py rename to muranoapi/cmd/__init__.py index 3399a4af..7d93825c 100644 --- a/muranoapi/version.py +++ b/muranoapi/cmd/__init__.py @@ -11,7 +11,3 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -from muranoapi.openstack.common import version as common_version - -version_info = common_version.VersionInfo('muranoapi') diff --git a/bin/murano-api b/muranoapi/cmd/api.py similarity index 74% rename from bin/murano-api rename to muranoapi/cmd/api.py index 0c1c3743..838363b8 100644 --- a/bin/murano-api +++ b/muranoapi/cmd/api.py @@ -14,17 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. -import os import sys -# If ../muranoapi/__init__.py exists, add ../ to Python search path, so that -# it will override what happens to be installed in /usr/(local/)lib/python... -possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), - os.pardir, - os.pardir)) -if os.path.exists(os.path.join(possible_topdir, 'muranoapi', '__init__.py')): - sys.path.insert(0, possible_topdir) - from muranoapi.common import config from muranoapi.common.service import TaskResultHandlerService from muranoapi.openstack.common import log @@ -32,7 +23,7 @@ from muranoapi.openstack.common import wsgi from muranoapi.openstack.common import service -if __name__ == '__main__': +def main(): try: config.parse_args() log.setup('muranoapi') @@ -49,3 +40,7 @@ if __name__ == '__main__': except RuntimeError, e: sys.stderr.write("ERROR: %s\n" % e) sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/muranoapi/common/config.py b/muranoapi/common/config.py index 196d79a6..05fffced 100644 --- a/muranoapi/common/config.py +++ b/muranoapi/common/config.py @@ -30,7 +30,7 @@ from oslo.config import cfg from paste import deploy from muranoapi.openstack.common import log -from muranoapi.version import version_info as version +from muranoapi import __version__ as version paste_deploy_opts = [ cfg.StrOpt('flavor'), @@ -84,8 +84,7 @@ CONF.import_opt('syslog_log_facility', 'muranoapi.openstack.common.log') cfg.set_defaults(log.log_opts, - default_log_levels=['amqplib=WARN', - 'qpid.messaging=INFO', + default_log_levels=['qpid.messaging=INFO', 'sqlalchemy=WARN', 'keystoneclient=INFO', 'eventlet.wsgi.server=WARN']) @@ -94,7 +93,7 @@ cfg.set_defaults(log.log_opts, def parse_args(args=None, usage=None, default_config_files=None): CONF(args=args, project='muranoapi', - version=version.cached_version_string(), + version=version, usage=usage, default_config_files=default_config_files) diff --git a/muranoapi/common/service.py b/muranoapi/common/service.py index eaf6419a..0297d961 100644 --- a/muranoapi/common/service.py +++ b/muranoapi/common/service.py @@ -12,75 +12,63 @@ # License for the specific language governing permissions and limitations # under the License. -import socket - -from amqplib.client_0_8 import AMQPConnectionException -import anyjson -import eventlet -from muranoapi.common.utils import retry, handle +from muranoapi.common.utils import handle from muranoapi.db.models import Status, Session, Environment, Deployment from muranoapi.db.session import get_session -from muranoapi.openstack.common import log as logging, timeutils +from muranoapi.openstack.common import log as logging, timeutils, service from muranoapi.common import config +from muranocommon.mq import MqClient from sqlalchemy import desc -amqp = eventlet.patcher.import_patched('amqplib.client_0_8') conf = config.CONF.reports rabbitmq = config.CONF.rabbitmq log = logging.getLogger(__name__) -class TaskResultHandlerService(): - thread = None +class TaskResultHandlerService(service.Service): + connection_params = { + 'login': rabbitmq.login, + 'password': rabbitmq.password, + 'host': rabbitmq.host, + 'port': rabbitmq.port, + 'virtual_host': rabbitmq.virtual_host + } + + def __init__(self): + super(TaskResultHandlerService, self).__init__() def start(self): - self.thread = eventlet.spawn(self.connect) + super(TaskResultHandlerService, self).start() + self.tg.add_thread(self._start_rabbitmq) def stop(self): - pass + super(TaskResultHandlerService, self).stop() - def wait(self): - self.thread.wait() - - @retry((socket.error, AMQPConnectionException), tries=-1) - def connect(self): - connection = amqp.Connection('{0}:{1}'. - format(rabbitmq.host, rabbitmq.port), - virtual_host=rabbitmq.virtual_host, - userid=rabbitmq.login, - password=rabbitmq.password, - ssl=rabbitmq.use_ssl, insist=True) - ch = connection.channel() - - def bind(exchange, queue): - if not exchange: - ch.exchange_declare(exchange, 'direct', durable=True, - auto_delete=False) - ch.queue_declare(queue, durable=True, auto_delete=False) - if not exchange: - ch.queue_bind(queue, exchange, queue) - - bind(conf.results_exchange, conf.results_queue) - bind(conf.reports_exchange, conf.reports_queue) - - ch.basic_consume(conf.results_exchange, callback=handle_result) - ch.basic_consume(conf.reports_exchange, callback=handle_report, - no_ack=True) - while ch.callbacks: - ch.wait() + def _start_rabbitmq(self): + while True: + try: + with MqClient(**self.connection_params) as mqClient: + mqClient.declare(conf.results_exchange, conf.results_queue) + mqClient.declare(conf.reports_exchange, conf.reports_queue) + with mqClient.open(conf.results_queue) as results_sb: + with mqClient.open(conf.reports_queue) as reports_sb: + while True: + report = reports_sb.get_message(timeout=1000) + self.tg.add_thread(handle_report, report.body) + result = results_sb.get_message(timeout=1000) + self.tg.add_thread(handle_result, result.body) + except Exception as ex: + log.exception(ex) @handle -def handle_result(msg): +def handle_result(environment_result): log.debug(_('Got result message from ' - 'orchestration engine:\n{0}'.format(msg.body))) + 'orchestration engine:\n{0}'.format(environment_result))) - environment_result = anyjson.deserialize(msg.body) if 'deleted' in environment_result: log.debug(_('Result for environment {0} is dropped. Environment ' 'is deleted'.format(environment_result['id']))) - - msg.channel.basic_ack(msg.delivery_tag) return session = get_session() @@ -108,20 +96,18 @@ def handle_result(msg): status.text = "Deployment finished" deployment.statuses.append(status) deployment.save(session) - msg.channel.basic_ack(msg.delivery_tag) @handle -def handle_report(msg): +def handle_report(report): log.debug(_('Got report message from orchestration ' - 'engine:\n{0}'.format(msg.body))) + 'engine:\n{0}'.format(report))) - params = anyjson.deserialize(msg.body) - params['entity_id'] = params['id'] - del params['id'] + report['entity_id'] = report['id'] + del report['id'] status = Status() - status.update(params) + status.update(report) session = get_session() #connect with deployment @@ -136,4 +122,4 @@ def get_last_deployment(session, env_id): query = session.query(Deployment). \ filter_by(environment_id=env_id). \ order_by(desc(Deployment.started)) - return query.first() + return query.first() \ No newline at end of file diff --git a/muranoapi/db/services/environments.py b/muranoapi/db/services/environments.py index 66efcff3..c20f2d68 100644 --- a/muranoapi/db/services/environments.py +++ b/muranoapi/db/services/environments.py @@ -13,18 +13,15 @@ # under the License. from collections import namedtuple -from amqplib.client_0_8 import Message -import anyjson -import eventlet from jsonschema import validate from muranoapi.api.v1.schemas import ENV_SCHEMA from muranoapi.common import config from muranoapi.db.models import Session, Environment +from muranoapi.db.services.sessions import SessionServices, SessionState from muranoapi.db.session import get_session -from sessions import SessionServices, SessionState +from muranocommon.mq import MqClient, Message -amqp = eventlet.patcher.import_patched('amqplib.client_0_8') rabbitmq = config.CONF.rabbitmq EnvironmentStatus = namedtuple('EnvironmentStatus', [ @@ -122,18 +119,20 @@ class EnvironmentServices(object): #Set X-Auth-Token for conductor env['token'] = token - connection = amqp.Connection('{0}:{1}'. - format(rabbitmq.host, rabbitmq.port), - virtual_host=rabbitmq.virtual_host, - userid=rabbitmq.login, - password=rabbitmq.password, - ssl=rabbitmq.use_ssl, insist=True) - channel = connection.channel() - channel.exchange_declare('tasks', 'direct', durable=True, - auto_delete=False) + message = Message() + message.body = env - channel.basic_publish(Message(body=anyjson.serialize(env)), 'tasks', - 'tasks') + connection_params = { + 'login': rabbitmq.login, + 'password': rabbitmq.password, + 'host': rabbitmq.host, + 'port': rabbitmq.port, + 'virtual_host': rabbitmq.virtual_host + } + + with MqClient(**connection_params) as mqClient: + mqClient.declare('tasks', 'tasks') + mqClient.send(message, 'tasks', 'tasks') @staticmethod def get_environment_description(environment_id, session_id=None): diff --git a/muranoapi/db/services/sessions.py b/muranoapi/db/services/sessions.py index 6a8c041c..2999532f 100644 --- a/muranoapi/db/services/sessions.py +++ b/muranoapi/db/services/sessions.py @@ -13,15 +13,12 @@ # under the License. from collections import namedtuple -from amqplib.client_0_8 import Message -import anyjson -import eventlet from muranoapi.common import config from muranoapi.db.models import Session, Environment, Deployment, Status from muranoapi.db.session import get_session +from muranocommon.mq import MqClient, Message -amqp = eventlet.patcher.import_patched('amqplib.client_0_8') rabbitmq = config.CONF.rabbitmq SessionState = namedtuple('SessionState', ['open', 'deploying', 'deployed'])( @@ -136,16 +133,17 @@ class SessionServices(object): unit.add(session) unit.add(deployment) - connection = amqp.Connection('{0}:{1}'. - format(rabbitmq.host, rabbitmq.port), - virtual_host=rabbitmq.virtual_host, - userid=rabbitmq.login, - password=rabbitmq.password, - ssl=rabbitmq.use_ssl, insist=True) - channel = connection.channel() - channel.exchange_declare('tasks', 'direct', durable=True, - auto_delete=False) + message = Message() + message.body = environment - channel.basic_publish( - Message(body=anyjson.serialize(environment)), 'tasks', 'tasks' - ) + connection_params = { + 'login': rabbitmq.login, + 'password': rabbitmq.password, + 'host': rabbitmq.host, + 'port': rabbitmq.port, + 'virtual_host': rabbitmq.virtual_host + } + + with MqClient(**connection_params) as mqClient: + mqClient.declare('tasks', 'tasks') + mqClient.send(message, 'tasks', 'tasks') diff --git a/muranoapi/openstack/common/config/generator.py b/muranoapi/openstack/common/config/generator.py old mode 100755 new mode 100644 index d068899b..b55e3eef --- a/muranoapi/openstack/common/config/generator.py +++ b/muranoapi/openstack/common/config/generator.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 SINA Corporation @@ -16,10 +15,11 @@ # License for the specific language governing permissions and limitations # under the License. # -# @author: Zhongyue Luo, SINA Corporation. -# + """Extracts OpenStack config option info from module(s).""" +from __future__ import print_function + import imp import os import re @@ -50,7 +50,6 @@ OPT_TYPES = { MULTISTROPT: 'multi valued', } -OPTION_COUNT = 0 OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT, FLOATOPT, LISTOPT, MULTISTROPT])) @@ -97,8 +96,6 @@ def generate(srcfiles): for group, opts in opts_by_group.items(): print_group_opts(group, opts) - print "# Total option count: %d" % OPTION_COUNT - def _import_module(mod_str): try: @@ -161,18 +158,16 @@ def _list_opts(obj): def print_group_opts(group, opts_by_module): - print "[%s]" % group - print - global OPTION_COUNT + print("[%s]" % group) + print('') for mod, opts in opts_by_module: - OPTION_COUNT += len(opts) - print '#' - print '# Options defined in %s' % mod - print '#' - print + print('#') + print('# Options defined in %s' % mod) + print('#') + print('') for opt in opts: _print_opt(opt) - print + print('') def _get_my_ip(): @@ -188,7 +183,12 @@ def _get_my_ip(): def _sanitize_default(s): """Set up a reasonably sensible default for pybasedir, my_ip and host.""" - if s.startswith(BASEDIR): + if s.startswith(sys.prefix): + # NOTE(jd) Don't use os.path.join, because it is likely to think the + # second part is an absolute pathname and therefore drop the first + # part. + s = os.path.normpath("/usr/" + s[len(sys.prefix):]) + elif s.startswith(BASEDIR): return s.replace(BASEDIR, '/usr/lib/python/site-packages') elif BASEDIR in s: return s.replace(BASEDIR, '') @@ -205,6 +205,7 @@ def _print_opt(opt): opt_name, opt_default, opt_help = opt.dest, opt.default, opt.help if not opt_help: sys.stderr.write('WARNING: "%s" is missing help string.\n' % opt_name) + opt_help = "" opt_type = None try: opt_type = OPTION_REGEX.search(str(type(opt))).group(0) @@ -212,33 +213,33 @@ def _print_opt(opt): sys.stderr.write("%s\n" % str(err)) sys.exit(1) opt_help += ' (' + OPT_TYPES[opt_type] + ')' - print '#', "\n# ".join(textwrap.wrap(opt_help, WORDWRAP_WIDTH)) + print('#', "\n# ".join(textwrap.wrap(opt_help, WORDWRAP_WIDTH))) try: if opt_default is None: - print '#%s=' % opt_name + print('#%s=' % opt_name) elif opt_type == STROPT: assert(isinstance(opt_default, basestring)) - print '#%s=%s' % (opt_name, _sanitize_default(opt_default)) + print('#%s=%s' % (opt_name, _sanitize_default(opt_default))) elif opt_type == BOOLOPT: assert(isinstance(opt_default, bool)) - print '#%s=%s' % (opt_name, str(opt_default).lower()) + print('#%s=%s' % (opt_name, str(opt_default).lower())) elif opt_type == INTOPT: assert(isinstance(opt_default, int) and not isinstance(opt_default, bool)) - print '#%s=%s' % (opt_name, opt_default) + print('#%s=%s' % (opt_name, opt_default)) elif opt_type == FLOATOPT: assert(isinstance(opt_default, float)) - print '#%s=%s' % (opt_name, opt_default) + print('#%s=%s' % (opt_name, opt_default)) elif opt_type == LISTOPT: assert(isinstance(opt_default, list)) - print '#%s=%s' % (opt_name, ','.join(opt_default)) + print('#%s=%s' % (opt_name, ','.join(opt_default))) elif opt_type == MULTISTROPT: assert(isinstance(opt_default, list)) if not opt_default: opt_default = [''] for default in opt_default: - print '#%s=%s' % (opt_name, default) - print + print('#%s=%s' % (opt_name, default)) + print('') except Exception: sys.stderr.write('Error in option "%s"\n' % opt_name) sys.exit(1) @@ -246,7 +247,7 @@ def _print_opt(opt): def main(): if len(sys.argv) < 2: - print "usage: %s [srcfile]...\n" % sys.argv[0] + print("usage: %s [srcfile]...\n" % sys.argv[0]) sys.exit(0) generate(sys.argv[1:]) diff --git a/muranoapi/openstack/common/context.py b/muranoapi/openstack/common/context.py index 1d9b8215..0c17d1b1 100644 --- a/muranoapi/openstack/common/context.py +++ b/muranoapi/openstack/common/context.py @@ -61,7 +61,7 @@ class RequestContext(object): 'request_id': self.request_id} -def get_admin_context(show_deleted="no"): +def get_admin_context(show_deleted=False): context = RequestContext(None, tenant=None, is_admin=True, diff --git a/muranoapi/openstack/common/db/exception.py b/muranoapi/openstack/common/db/exception.py index c63e13b4..5c2af40c 100644 --- a/muranoapi/openstack/common/db/exception.py +++ b/muranoapi/openstack/common/db/exception.py @@ -18,7 +18,7 @@ """DB related custom exceptions.""" -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa class DBError(Exception): diff --git a/muranoapi/openstack/common/db/sqlalchemy/migration.py b/muranoapi/openstack/common/db/sqlalchemy/migration.py new file mode 100644 index 00000000..e643d8e4 --- /dev/null +++ b/muranoapi/openstack/common/db/sqlalchemy/migration.py @@ -0,0 +1,159 @@ +# coding: utf-8 +# +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# Base on code in migrate/changeset/databases/sqlite.py which is under +# the following license: +# +# The MIT License +# +# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE + +import re + +from migrate.changeset import ansisql +from migrate.changeset.databases import sqlite +from sqlalchemy.schema import UniqueConstraint + + +def _get_unique_constraints(self, table): + """Retrieve information about existing unique constraints of the table + + This feature is needed for _recreate_table() to work properly. + Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x. + + """ + + data = table.metadata.bind.execute( + """SELECT sql + FROM sqlite_master + WHERE + type='table' AND + name=:table_name""", + table_name=table.name + ).fetchone()[0] + + UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)" + return [ + UniqueConstraint( + *[getattr(table.columns, c.strip(' "')) for c in cols.split(",")], + name=name + ) + for name, cols in re.findall(UNIQUE_PATTERN, data) + ] + + +def _recreate_table(self, table, column=None, delta=None, omit_uniques=None): + """Recreate the table properly + + Unlike the corresponding original method of sqlalchemy-migrate this one + doesn't drop existing unique constraints when creating a new one. + + """ + + table_name = self.preparer.format_table(table) + + # we remove all indexes so as not to have + # problems during copy and re-create + for index in table.indexes: + index.drop() + + # reflect existing unique constraints + for uc in self._get_unique_constraints(table): + table.append_constraint(uc) + # omit given unique constraints when creating a new table if required + table.constraints = set([ + cons for cons in table.constraints + if omit_uniques is None or cons.name not in omit_uniques + ]) + + self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name) + self.execute() + + insertion_string = self._modify_table(table, column, delta) + + table.create(bind=self.connection) + self.append(insertion_string % {'table_name': table_name}) + self.execute() + self.append('DROP TABLE migration_tmp') + self.execute() + + +def _visit_migrate_unique_constraint(self, *p, **k): + """Drop the given unique constraint + + The corresponding original method of sqlalchemy-migrate just + raises NotImplemented error + + """ + + self.recreate_table(p[0].table, omit_uniques=[p[0].name]) + + +def patch_migrate(): + """A workaround for SQLite's inability to alter things + + SQLite abilities to alter tables are very limited (please read + http://www.sqlite.org/lang_altertable.html for more details). + E. g. one can't drop a column or a constraint in SQLite. The + workaround for this is to recreate the original table omitting + the corresponding constraint (or column). + + sqlalchemy-migrate library has recreate_table() method that + implements this workaround, but it does it wrong: + + - information about unique constraints of a table + is not retrieved. So if you have a table with one + unique constraint and a migration adding another one + you will end up with a table that has only the + latter unique constraint, and the former will be lost + + - dropping of unique constraints is not supported at all + + The proper way to fix this is to provide a pull-request to + sqlalchemy-migrate, but the project seems to be dead. So we + can go on with monkey-patching of the lib at least for now. + + """ + + # this patch is needed to ensure that recreate_table() doesn't drop + # existing unique constraints of the table when creating a new one + helper_cls = sqlite.SQLiteHelper + helper_cls.recreate_table = _recreate_table + helper_cls._get_unique_constraints = _get_unique_constraints + + # this patch is needed to be able to drop existing unique constraints + constraint_cls = sqlite.SQLiteConstraintDropper + constraint_cls.visit_migrate_unique_constraint = \ + _visit_migrate_unique_constraint + constraint_cls.__bases__ = (ansisql.ANSIColumnDropper, + sqlite.SQLiteConstraintGenerator) diff --git a/muranoapi/openstack/common/db/sqlalchemy/models.py b/muranoapi/openstack/common/db/sqlalchemy/models.py index b5b980bc..3791927e 100644 --- a/muranoapi/openstack/common/db/sqlalchemy/models.py +++ b/muranoapi/openstack/common/db/sqlalchemy/models.py @@ -22,11 +22,13 @@ SQLAlchemy models. """ +import six + from sqlalchemy import Column, Integer from sqlalchemy import DateTime from sqlalchemy.orm import object_mapper -from muranoapi.openstack.common.db.sqlalchemy.session import get_session +from muranoapi.openstack.common.db.sqlalchemy import session as sa from muranoapi.openstack.common import timeutils @@ -37,7 +39,7 @@ class ModelBase(object): def save(self, session=None): """Save this object.""" if not session: - session = get_session() + session = sa.get_session() # NOTE(boris-42): This part of code should be look like: # sesssion.add(self) # session.flush() @@ -70,12 +72,12 @@ class ModelBase(object): return self def next(self): - n = self._i.next() + n = six.advance_iterator(self._i) return n, getattr(self, n) def update(self, values): """Make the model object behave like a dict.""" - for k, v in values.iteritems(): + for k, v in six.iteritems(values): setattr(self, k, v) def iteritems(self): @@ -84,7 +86,7 @@ class ModelBase(object): Includes attributes from joins. """ local = dict(self) - joined = dict([(k, v) for k, v in self.__dict__.iteritems() + joined = dict([(k, v) for k, v in six.iteritems(self.__dict__) if not k[0] == '_']) local.update(joined) return local.iteritems() diff --git a/muranoapi/openstack/common/db/sqlalchemy/session.py b/muranoapi/openstack/common/db/sqlalchemy/session.py index 22310ceb..6dbaf1ed 100644 --- a/muranoapi/openstack/common/db/sqlalchemy/session.py +++ b/muranoapi/openstack/common/db/sqlalchemy/session.py @@ -256,12 +256,10 @@ from sqlalchemy.pool import NullPool, StaticPool from sqlalchemy.sql.expression import literal_column from muranoapi.openstack.common.db import exception -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import timeutils -DEFAULT = 'DEFAULT' - sqlite_db_opts = [ cfg.StrOpt('sqlite_db', default='muranoapi.sqlite', @@ -278,8 +276,6 @@ database_opts = [ '../', '$sqlite_db')), help='The SQLAlchemy connection string used to connect to the ' 'database', - deprecated_name='sql_connection', - deprecated_group=DEFAULT, secret=True), cfg.StrOpt('slave_connection', default='', @@ -288,47 +284,31 @@ database_opts = [ secret=True), cfg.IntOpt('idle_timeout', default=3600, - deprecated_name='sql_idle_timeout', - deprecated_group=DEFAULT, help='timeout before idle sql connections are reaped'), cfg.IntOpt('min_pool_size', default=1, - deprecated_name='sql_min_pool_size', - deprecated_group=DEFAULT, help='Minimum number of SQL connections to keep open in a ' 'pool'), cfg.IntOpt('max_pool_size', default=None, - deprecated_name='sql_max_pool_size', - deprecated_group=DEFAULT, help='Maximum number of SQL connections to keep open in a ' 'pool'), cfg.IntOpt('max_retries', default=10, - deprecated_name='sql_max_retries', - deprecated_group=DEFAULT, help='maximum db connection retries during startup. ' '(setting -1 implies an infinite retry count)'), cfg.IntOpt('retry_interval', default=10, - deprecated_name='sql_retry_interval', - deprecated_group=DEFAULT, help='interval between retries of opening a sql connection'), cfg.IntOpt('max_overflow', default=None, - deprecated_name='sql_max_overflow', - deprecated_group=DEFAULT, help='If set, use this value for max_overflow with sqlalchemy'), cfg.IntOpt('connection_debug', default=0, - deprecated_name='sql_connection_debug', - deprecated_group=DEFAULT, help='Verbosity of SQL debugging information. 0=None, ' '100=Everything'), cfg.BoolOpt('connection_trace', default=False, - deprecated_name='sql_connection_trace', - deprecated_group=DEFAULT, help='Add python stack traces to SQL as comment strings'), cfg.IntOpt('pool_timeout', default=None, @@ -338,6 +318,7 @@ database_opts = [ CONF = cfg.CONF CONF.register_opts(sqlite_db_opts) CONF.register_opts(database_opts, 'database') + LOG = logging.getLogger(__name__) _ENGINE = None diff --git a/muranoapi/openstack/common/db/sqlalchemy/utils.py b/muranoapi/openstack/common/db/sqlalchemy/utils.py old mode 100644 new mode 100755 index a3cba92c..50fd872f --- a/muranoapi/openstack/common/db/sqlalchemy/utils.py +++ b/muranoapi/openstack/common/db/sqlalchemy/utils.py @@ -18,12 +18,28 @@ # License for the specific language governing permissions and limitations # under the License. -"""Implementation of paginate query.""" - import sqlalchemy +from sqlalchemy import Boolean +from sqlalchemy import CheckConstraint +from sqlalchemy import Column +from sqlalchemy.engine import reflection +from sqlalchemy.ext.compiler import compiles +from sqlalchemy import func +from sqlalchemy import Index +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy.sql.expression import literal_column +from sqlalchemy.sql.expression import UpdateBase +from sqlalchemy.sql import select +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy.types import NullType -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa + +from muranoapi.openstack.common import exception from muranoapi.openstack.common import log as logging +from muranoapi.openstack.common import timeutils LOG = logging.getLogger(__name__) @@ -85,11 +101,14 @@ def paginate_query(query, model, limit, sort_keys, marker=None, # Add sorting for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): - sort_dir_func = { - 'asc': sqlalchemy.asc, - 'desc': sqlalchemy.desc, - }[current_sort_dir] - + try: + sort_dir_func = { + 'asc': sqlalchemy.asc, + 'desc': sqlalchemy.desc, + }[current_sort_dir] + except KeyError: + raise ValueError(_("Unknown sort direction, " + "must be 'desc' or 'asc'")) try: sort_key_attr = getattr(model, current_sort_key) except AttributeError: @@ -114,11 +133,8 @@ def paginate_query(query, model, limit, sort_keys, marker=None, model_attr = getattr(model, sort_keys[i]) if sort_dirs[i] == 'desc': crit_attrs.append((model_attr < marker_values[i])) - elif sort_dirs[i] == 'asc': - crit_attrs.append((model_attr > marker_values[i])) else: - raise ValueError(_("Unknown sort direction, " - "must be 'desc' or 'asc'")) + crit_attrs.append((model_attr > marker_values[i])) criteria = sqlalchemy.sql.and_(*crit_attrs) criteria_list.append(criteria) @@ -130,3 +146,305 @@ def paginate_query(query, model, limit, sort_keys, marker=None, query = query.limit(limit) return query + + +def get_table(engine, name): + """Returns an sqlalchemy table dynamically from db. + + Needed because the models don't work for us in migrations + as models will be far out of sync with the current data. + """ + metadata = MetaData() + metadata.bind = engine + return Table(name, metadata, autoload=True) + + +class InsertFromSelect(UpdateBase): + """Form the base for `INSERT INTO table (SELECT ... )` statement.""" + def __init__(self, table, select): + self.table = table + self.select = select + + +@compiles(InsertFromSelect) +def visit_insert_from_select(element, compiler, **kw): + """Form the `INSERT INTO table (SELECT ... )` statement.""" + return "INSERT INTO %s %s" % ( + compiler.process(element.table, asfrom=True), + compiler.process(element.select)) + + +def _get_not_supported_column(col_name_col_instance, column_name): + try: + column = col_name_col_instance[column_name] + except KeyError: + msg = _("Please specify column %s in col_name_col_instance " + "param. It is required because column has unsupported " + "type by sqlite).") + raise exception.OpenstackException(message=msg % column_name) + + if not isinstance(column, Column): + msg = _("col_name_col_instance param has wrong type of " + "column instance for column %s It should be instance " + "of sqlalchemy.Column.") + raise exception.OpenstackException(message=msg % column_name) + return column + + +def drop_old_duplicate_entries_from_table(migrate_engine, table_name, + use_soft_delete, *uc_column_names): + """Drop all old rows having the same values for columns in uc_columns. + + This method drop (or mark ad `deleted` if use_soft_delete is True) old + duplicate rows form table with name `table_name`. + + :param migrate_engine: Sqlalchemy engine + :param table_name: Table with duplicates + :param use_soft_delete: If True - values will be marked as `deleted`, + if False - values will be removed from table + :param uc_column_names: Unique constraint columns + """ + meta = MetaData() + meta.bind = migrate_engine + + table = Table(table_name, meta, autoload=True) + columns_for_group_by = [table.c[name] for name in uc_column_names] + + columns_for_select = [func.max(table.c.id)] + columns_for_select.extend(columns_for_group_by) + + duplicated_rows_select = select(columns_for_select, + group_by=columns_for_group_by, + having=func.count(table.c.id) > 1) + + for row in migrate_engine.execute(duplicated_rows_select): + # NOTE(boris-42): Do not remove row that has the biggest ID. + delete_condition = table.c.id != row[0] + is_none = None # workaround for pyflakes + delete_condition &= table.c.deleted_at == is_none + for name in uc_column_names: + delete_condition &= table.c[name] == row[name] + + rows_to_delete_select = select([table.c.id]).where(delete_condition) + for row in migrate_engine.execute(rows_to_delete_select).fetchall(): + LOG.info(_("Deleting duplicated row with id: %(id)s from table: " + "%(table)s") % dict(id=row[0], table=table_name)) + + if use_soft_delete: + delete_statement = table.update().\ + where(delete_condition).\ + values({ + 'deleted': literal_column('id'), + 'updated_at': literal_column('updated_at'), + 'deleted_at': timeutils.utcnow() + }) + else: + delete_statement = table.delete().where(delete_condition) + migrate_engine.execute(delete_statement) + + +def _get_default_deleted_value(table): + if isinstance(table.c.id.type, Integer): + return 0 + if isinstance(table.c.id.type, String): + return "" + raise exception.OpenstackException( + message=_("Unsupported id columns type")) + + +def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes): + table = get_table(migrate_engine, table_name) + + insp = reflection.Inspector.from_engine(migrate_engine) + real_indexes = insp.get_indexes(table_name) + existing_index_names = dict( + [(index['name'], index['column_names']) for index in real_indexes]) + + # NOTE(boris-42): Restore indexes on `deleted` column + for index in indexes: + if 'deleted' not in index['column_names']: + continue + name = index['name'] + if name in existing_index_names: + column_names = [table.c[c] for c in existing_index_names[name]] + old_index = Index(name, *column_names, unique=index["unique"]) + old_index.drop(migrate_engine) + + column_names = [table.c[c] for c in index['column_names']] + new_index = Index(index["name"], *column_names, unique=index["unique"]) + new_index.create(migrate_engine) + + +def change_deleted_column_type_to_boolean(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_boolean_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + old_deleted = Column('old_deleted', Boolean, default=False) + old_deleted.create(table, populate_default=False) + + table.update().\ + where(table.c.deleted == table.c.id).\ + values(old_deleted=True).\ + execute() + + table.c.deleted.drop() + table.c.old_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name, + **col_name_col_instance): + insp = reflection.Inspector.from_engine(migrate_engine) + table = get_table(migrate_engine, table_name) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', Boolean, default=0) + columns.append(column_copy) + + constraints = [constraint.copy() for constraint in table.constraints] + + meta = MetaData(bind=migrate_engine) + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + c_select = [] + for c in table.c: + if c.name != "deleted": + c_select.append(c) + else: + c_select.append(table.c.deleted == table.c.id) + + ins = InsertFromSelect(new_table, select(c_select)) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + new_table.update().\ + where(new_table.c.deleted == new_table.c.id).\ + values(deleted=True).\ + execute() + + +def change_deleted_column_type_to_id_type(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_id_type_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + new_deleted = Column('new_deleted', table.c.id.type, + default=_get_default_deleted_value(table)) + new_deleted.create(table, populate_default=True) + + deleted = True # workaround for pyflakes + table.update().\ + where(table.c.deleted == deleted).\ + values(new_deleted=table.c.id).\ + execute() + table.c.deleted.drop() + table.c.new_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name, + **col_name_col_instance): + # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check + # constraints in sqlite DB and our `deleted` column has + # 2 check constraints. So there is only one way to remove + # these constraints: + # 1) Create new table with the same columns, constraints + # and indexes. (except deleted column). + # 2) Copy all data from old to new table. + # 3) Drop old table. + # 4) Rename new table to old table name. + insp = reflection.Inspector.from_engine(migrate_engine) + meta = MetaData(bind=migrate_engine) + table = Table(table_name, meta, autoload=True) + default_deleted_value = _get_default_deleted_value(table) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', table.c.id.type, + default=default_deleted_value) + columns.append(column_copy) + + def is_deleted_column_constraint(constraint): + # NOTE(boris-42): There is no other way to check is CheckConstraint + # associated with deleted column. + if not isinstance(constraint, CheckConstraint): + return False + sqltext = str(constraint.sqltext) + return (sqltext.endswith("deleted in (0, 1)") or + sqltext.endswith("deleted IN (:deleted_1, :deleted_2)")) + + constraints = [] + for constraint in table.constraints: + if not is_deleted_column_constraint(constraint): + constraints.append(constraint.copy()) + + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + ins = InsertFromSelect(new_table, table.select()) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + deleted = True # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=new_table.c.id).\ + execute() + + # NOTE(boris-42): Fix value of deleted column: False -> "" or 0. + deleted = False # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=default_deleted_value).\ + execute() diff --git a/muranoapi/openstack/common/eventlet_backdoor.py b/muranoapi/openstack/common/eventlet_backdoor.py index 57b89ae9..ab909ef8 100644 --- a/muranoapi/openstack/common/eventlet_backdoor.py +++ b/muranoapi/openstack/common/eventlet_backdoor.py @@ -18,8 +18,11 @@ from __future__ import print_function +import errno import gc +import os import pprint +import socket import sys import traceback @@ -28,14 +31,34 @@ import eventlet.backdoor import greenlet from oslo.config import cfg +from muranoapi.openstack.common.gettextutils import _ # noqa +from muranoapi.openstack.common import log as logging + +help_for_backdoor_port = ( + "Acceptable values are 0, , and :, where 0 results " + "in listening on a random tcp port number; results in listening " + "on the specified port number (and not enabling backdoor if that port " + "is in use); and : results in listening on the smallest " + "unused port number within the specified range of port numbers. The " + "chosen port is displayed in the service's log file.") eventlet_backdoor_opts = [ - cfg.IntOpt('backdoor_port', + cfg.StrOpt('backdoor_port', default=None, - help='port for eventlet backdoor to listen') + help="Enable eventlet backdoor. %s" % help_for_backdoor_port) ] CONF = cfg.CONF CONF.register_opts(eventlet_backdoor_opts) +LOG = logging.getLogger(__name__) + + +class EventletBackdoorConfigValueError(Exception): + def __init__(self, port_range, help_msg, ex): + msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. ' + '%(help)s' % + {'range': port_range, 'ex': ex, 'help': help_msg}) + super(EventletBackdoorConfigValueError, self).__init__(msg) + self.port_range = port_range def _dont_use_this(): @@ -60,6 +83,32 @@ def _print_nativethreads(): print() +def _parse_port_range(port_range): + if ':' not in port_range: + start, end = port_range, port_range + else: + start, end = port_range.split(':', 1) + try: + start, end = int(start), int(end) + if end < start: + raise ValueError + return start, end + except ValueError as ex: + raise EventletBackdoorConfigValueError(port_range, ex, + help_for_backdoor_port) + + +def _listen(host, start_port, end_port, listen_func): + try_port = start_port + while True: + try: + return listen_func((host, try_port)) + except socket.error as exc: + if (exc.errno != errno.EADDRINUSE or try_port >= end_port): + raise + try_port += 1 + + def initialize_if_enabled(): backdoor_locals = { 'exit': _dont_use_this, # So we don't exit the entire process @@ -72,6 +121,8 @@ def initialize_if_enabled(): if CONF.backdoor_port is None: return None + start_port, end_port = _parse_port_range(str(CONF.backdoor_port)) + # NOTE(johannes): The standard sys.displayhook will print the value of # the last expression and set it to __builtin__._, which overwrites # the __builtin__._ that gettext sets. Let's switch to using pprint @@ -82,8 +133,13 @@ def initialize_if_enabled(): pprint.pprint(val) sys.displayhook = displayhook - sock = eventlet.listen(('localhost', CONF.backdoor_port)) + sock = _listen('localhost', start_port, end_port, eventlet.listen) + + # In the case of backdoor port being zero, a port number is assigned by + # listen(). In any case, pull the port number out here. port = sock.getsockname()[1] + LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') % + {'port': port, 'pid': os.getpid()}) eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, locals=backdoor_locals) return port diff --git a/muranoapi/openstack/common/exception.py b/muranoapi/openstack/common/exception.py index d60830b3..d145e804 100644 --- a/muranoapi/openstack/common/exception.py +++ b/muranoapi/openstack/common/exception.py @@ -21,7 +21,7 @@ Exceptions common to OpenStack projects import logging -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa _FATAL_EXCEPTION_FORMAT_ERRORS = False @@ -33,7 +33,7 @@ class Error(Exception): class ApiError(Error): def __init__(self, message='Unknown', code='Unknown'): - self.message = message + self.api_message = message self.code = code super(ApiError, self).__init__('%s: %s' % (code, message)) @@ -44,19 +44,19 @@ class NotFound(Error): class UnknownScheme(Error): - msg = "Unknown scheme '%s' found in URI" + msg_fmt = "Unknown scheme '%s' found in URI" def __init__(self, scheme): - msg = self.__class__.msg % scheme + msg = self.msg_fmt % scheme super(UnknownScheme, self).__init__(msg) class BadStoreUri(Error): - msg = "The Store URI %s was malformed. Reason: %s" + msg_fmt = "The Store URI %s was malformed. Reason: %s" def __init__(self, uri, reason): - msg = self.__class__.msg % (uri, reason) + msg = self.msg_fmt % (uri, reason) super(BadStoreUri, self).__init__(msg) @@ -100,9 +100,7 @@ def wrap_exception(f): return f(*args, **kw) except Exception as e: if not isinstance(e, Error): - #exc_type, exc_value, exc_traceback = sys.exc_info() logging.exception(_('Uncaught exception')) - #logging.error(traceback.extract_stack(exc_traceback)) raise Error(str(e)) raise _wrap.func_name = f.func_name @@ -113,29 +111,29 @@ class OpenstackException(Exception): """Base Exception class. To correctly use this class, inherit from it and define - a 'message' property. That message will get printf'd + a 'msg_fmt' property. That message will get printf'd with the keyword arguments provided to the constructor. """ - message = "An unknown exception occurred" + msg_fmt = "An unknown exception occurred" def __init__(self, **kwargs): try: - self._error_string = self.message % kwargs + self._error_string = self.msg_fmt % kwargs - except Exception as e: + except Exception: if _FATAL_EXCEPTION_FORMAT_ERRORS: - raise e + raise else: # at least get the core message out if something happened - self._error_string = self.message + self._error_string = self.msg_fmt def __str__(self): return self._error_string class MalformedRequestBody(OpenstackException): - message = "Malformed message body: %(reason)s" + msg_fmt = "Malformed message body: %(reason)s" class InvalidContentType(OpenstackException): - message = "Invalid content type %(content_type)s" + msg_fmt = "Invalid content type %(content_type)s" diff --git a/muranoapi/openstack/common/excutils.py b/muranoapi/openstack/common/excutils.py index 10350980..647c5b78 100644 --- a/muranoapi/openstack/common/excutils.py +++ b/muranoapi/openstack/common/excutils.py @@ -19,16 +19,15 @@ Exception related utilities. """ -import contextlib import logging import sys +import time import traceback -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa -@contextlib.contextmanager -def save_and_reraise_exception(): +class save_and_reraise_exception(object): """Save current exception, run some code and then re-raise. In some cases the exception context can be cleared, resulting in None @@ -40,12 +39,60 @@ def save_and_reraise_exception(): To work around this, we save the exception state, run handler code, and then re-raise the original exception. If another exception occurs, the saved exception is logged and the new exception is re-raised. - """ - type_, value, tb = sys.exc_info() - try: - yield + + In some cases the caller may not want to re-raise the exception, and + for those circumstances this context provides a reraise flag that + can be used to suppress the exception. For example: + except Exception: - logging.error(_('Original exception being dropped: %s'), - traceback.format_exception(type_, value, tb)) - raise - raise type_, value, tb + with save_and_reraise_exception() as ctxt: + decide_if_need_reraise() + if not should_be_reraised: + ctxt.reraise = False + """ + def __init__(self): + self.reraise = True + + def __enter__(self): + self.type_, self.value, self.tb, = sys.exc_info() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + logging.error(_('Original exception being dropped: %s'), + traceback.format_exception(self.type_, + self.value, + self.tb)) + return False + if self.reraise: + raise self.type_, self.value, self.tb + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + if exc.message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + exc.message != last_exc_message): + logging.exception( + _('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = exc.message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/muranoapi/openstack/common/fileutils.py b/muranoapi/openstack/common/fileutils.py index b6c81d2a..63ea8323 100644 --- a/muranoapi/openstack/common/fileutils.py +++ b/muranoapi/openstack/common/fileutils.py @@ -21,7 +21,7 @@ import errno import os from muranoapi.openstack.common import excutils -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import log as logging LOG = logging.getLogger(__name__) diff --git a/muranoapi/openstack/common/gettextutils.py b/muranoapi/openstack/common/gettextutils.py index 88627a82..546a7a50 100644 --- a/muranoapi/openstack/common/gettextutils.py +++ b/muranoapi/openstack/common/gettextutils.py @@ -28,8 +28,11 @@ import copy import gettext import logging.handlers import os +import re import UserString +import six + _localedir = os.environ.get('muranoapi'.upper() + '_LOCALEDIR') _t = gettext.translation('muranoapi', localedir=_localedir, fallback=True) @@ -120,7 +123,29 @@ class Message(UserString.UserString, object): if self.params is not None: full_msg = full_msg % self.params - return unicode(full_msg) + return six.text_type(full_msg) + + def _save_dictionary_parameter(self, dict_param): + full_msg = self.data + # look for %(blah) fields in string; + # ignore %% and deal with the + # case where % is first character on the line + keys = re.findall('(?:[^%]|^)%\((\w*)\)[a-z]', full_msg) + + # if we don't find any %(blah) blocks but have a %s + if not keys and re.findall('(?:[^%]|^)%[a-z]', full_msg): + # apparently the full dictionary is the parameter + params = copy.deepcopy(dict_param) + else: + params = {} + for key in keys: + try: + params[key] = copy.deepcopy(dict_param[key]) + except TypeError: + # cast uncopyable thing to unicode string + params[key] = unicode(dict_param[key]) + + return params def _save_parameters(self, other): # we check for None later to see if @@ -128,8 +153,16 @@ class Message(UserString.UserString, object): # so encapsulate if our parameter is actually None if other is None: self.params = (other, ) + elif isinstance(other, dict): + self.params = self._save_dictionary_parameter(other) else: - self.params = copy.deepcopy(other) + # fallback to casting to unicode, + # this will handle the problematic python code-like + # objects that cannot be deep-copied + try: + self.params = copy.deepcopy(other) + except TypeError: + self.params = unicode(other) return self diff --git a/muranoapi/openstack/common/jsonutils.py b/muranoapi/openstack/common/jsonutils.py index 9f0536b1..6cb8b899 100644 --- a/muranoapi/openstack/common/jsonutils.py +++ b/muranoapi/openstack/common/jsonutils.py @@ -41,6 +41,7 @@ import json import types import xmlrpclib +import netaddr import six from muranoapi.openstack.common import timeutils @@ -137,6 +138,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # Likely an instance of something. Watch for cycles. # Ignore class member vars. return recursive(value.__dict__, level=level + 1) + elif isinstance(value, netaddr.IPAddress): + return six.text_type(value) else: if any(test(value) for test in _nasty_type_tests): return six.text_type(value) diff --git a/muranoapi/openstack/common/lockutils.py b/muranoapi/openstack/common/lockutils.py index 86e86627..5f316d4d 100644 --- a/muranoapi/openstack/common/lockutils.py +++ b/muranoapi/openstack/common/lockutils.py @@ -16,11 +16,10 @@ # under the License. +import contextlib import errno import functools import os -import shutil -import tempfile import time import weakref @@ -28,7 +27,7 @@ from eventlet import semaphore from oslo.config import cfg from muranoapi.openstack.common import fileutils -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import local from muranoapi.openstack.common import log as logging @@ -40,8 +39,7 @@ util_opts = [ cfg.BoolOpt('disable_process_locking', default=False, help='Whether to disable inter-process locks'), cfg.StrOpt('lock_path', - help=('Directory to use for lock files. Default to a ' - 'temp directory')) + help=('Directory to use for lock files.')) ] @@ -135,7 +133,87 @@ else: _semaphores = weakref.WeakValueDictionary() -def synchronized(name, lock_file_prefix, external=False, lock_path=None): +@contextlib.contextmanager +def lock(name, lock_file_prefix=None, external=False, lock_path=None): + """Context based lock + + This function yields a `semaphore.Semaphore` instance unless external is + True, in which case, it'll yield an InterProcessLock instance. + + :param lock_file_prefix: The lock_file_prefix argument is used to provide + lock files on disk with a meaningful prefix. + + :param external: The external keyword argument denotes whether this lock + should work across multiple processes. This means that if two different + workers both run a a method decorated with @synchronized('mylock', + external=True), only one of them will execute at a time. + + :param lock_path: The lock_path keyword argument is used to specify a + special location for external lock files to live. If nothing is set, then + CONF.lock_path is used as a default. + """ + # NOTE(soren): If we ever go natively threaded, this will be racy. + # See http://stackoverflow.com/questions/5390569/dyn + # amically-allocating-and-destroying-mutexes + sem = _semaphores.get(name, semaphore.Semaphore()) + if name not in _semaphores: + # this check is not racy - we're already holding ref locally + # so GC won't remove the item and there was no IO switch + # (only valid in greenthreads) + _semaphores[name] = sem + + with sem: + LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name}) + + # NOTE(mikal): I know this looks odd + if not hasattr(local.strong_store, 'locks_held'): + local.strong_store.locks_held = [] + local.strong_store.locks_held.append(name) + + try: + if external and not CONF.disable_process_locking: + LOG.debug(_('Attempting to grab file lock "%(lock)s"'), + {'lock': name}) + + # We need a copy of lock_path because it is non-local + local_lock_path = lock_path or CONF.lock_path + if not local_lock_path: + raise cfg.RequiredOptError('lock_path') + + if not os.path.exists(local_lock_path): + fileutils.ensure_tree(local_lock_path) + LOG.info(_('Created lock path: %s'), local_lock_path) + + def add_prefix(name, prefix): + if not prefix: + return name + sep = '' if prefix.endswith('-') else '-' + return '%s%s%s' % (prefix, sep, name) + + # NOTE(mikal): the lock name cannot contain directory + # separators + lock_file_name = add_prefix(name.replace(os.sep, '_'), + lock_file_prefix) + + lock_file_path = os.path.join(local_lock_path, lock_file_name) + + try: + lock = InterProcessLock(lock_file_path) + with lock as lock: + LOG.debug(_('Got file lock "%(lock)s" at %(path)s'), + {'lock': name, 'path': lock_file_path}) + yield lock + finally: + LOG.debug(_('Released file lock "%(lock)s" at %(path)s'), + {'lock': name, 'path': lock_file_path}) + else: + yield sem + + finally: + local.strong_store.locks_held.remove(name) + + +def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): """Synchronization decorator. Decorating a method like so:: @@ -157,99 +235,18 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None): ... This way only one of either foo or bar can be executing at a time. - - :param lock_file_prefix: The lock_file_prefix argument is used to provide - lock files on disk with a meaningful prefix. The prefix should end with a - hyphen ('-') if specified. - - :param external: The external keyword argument denotes whether this lock - should work across multiple processes. This means that if two different - workers both run a a method decorated with @synchronized('mylock', - external=True), only one of them will execute at a time. - - :param lock_path: The lock_path keyword argument is used to specify a - special location for external lock files to live. If nothing is set, then - CONF.lock_path is used as a default. """ def wrap(f): @functools.wraps(f) def inner(*args, **kwargs): - # NOTE(soren): If we ever go natively threaded, this will be racy. - # See http://stackoverflow.com/questions/5390569/dyn - # amically-allocating-and-destroying-mutexes - sem = _semaphores.get(name, semaphore.Semaphore()) - if name not in _semaphores: - # this check is not racy - we're already holding ref locally - # so GC won't remove the item and there was no IO switch - # (only valid in greenthreads) - _semaphores[name] = sem + with lock(name, lock_file_prefix, external, lock_path): + LOG.debug(_('Got semaphore / lock "%(function)s"'), + {'function': f.__name__}) + return f(*args, **kwargs) - with sem: - LOG.debug(_('Got semaphore "%(lock)s" for method ' - '"%(method)s"...'), {'lock': name, - 'method': f.__name__}) - - # NOTE(mikal): I know this looks odd - if not hasattr(local.strong_store, 'locks_held'): - local.strong_store.locks_held = [] - local.strong_store.locks_held.append(name) - - try: - if external and not CONF.disable_process_locking: - LOG.debug(_('Attempting to grab file lock "%(lock)s" ' - 'for method "%(method)s"...'), - {'lock': name, 'method': f.__name__}) - cleanup_dir = False - - # We need a copy of lock_path because it is non-local - local_lock_path = lock_path - if not local_lock_path: - local_lock_path = CONF.lock_path - - if not local_lock_path: - cleanup_dir = True - local_lock_path = tempfile.mkdtemp() - - if not os.path.exists(local_lock_path): - fileutils.ensure_tree(local_lock_path) - - # NOTE(mikal): the lock name cannot contain directory - # separators - safe_name = name.replace(os.sep, '_') - lock_file_name = '%s%s' % (lock_file_prefix, safe_name) - lock_file_path = os.path.join(local_lock_path, - lock_file_name) - - try: - lock = InterProcessLock(lock_file_path) - with lock: - LOG.debug(_('Got file lock "%(lock)s" at ' - '%(path)s for method ' - '"%(method)s"...'), - {'lock': name, - 'path': lock_file_path, - 'method': f.__name__}) - retval = f(*args, **kwargs) - finally: - LOG.debug(_('Released file lock "%(lock)s" at ' - '%(path)s for method "%(method)s"...'), - {'lock': name, - 'path': lock_file_path, - 'method': f.__name__}) - # NOTE(vish): This removes the tempdir if we needed - # to create one. This is used to - # cleanup the locks left behind by unit - # tests. - if cleanup_dir: - shutil.rmtree(local_lock_path) - else: - retval = f(*args, **kwargs) - - finally: - local.strong_store.locks_held.remove(name) - - return retval + LOG.debug(_('Semaphore / lock released "%(function)s"'), + {'function': f.__name__}) return inner return wrap @@ -273,7 +270,7 @@ def synchronized_with_prefix(lock_file_prefix): ... The lock_file_prefix argument is used to provide lock files on disk with a - meaningful prefix. The prefix should end with a hyphen ('-') if specified. + meaningful prefix. """ return functools.partial(synchronized, lock_file_prefix=lock_file_prefix) diff --git a/muranoapi/openstack/common/log.py b/muranoapi/openstack/common/log.py index c7c59b48..006b44cf 100644 --- a/muranoapi/openstack/common/log.py +++ b/muranoapi/openstack/common/log.py @@ -29,8 +29,6 @@ It also allows setting of formatting information through conf. """ -import ConfigParser -import cStringIO import inspect import itertools import logging @@ -41,8 +39,9 @@ import sys import traceback from oslo.config import cfg +from six import moves -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import importutils from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common import local @@ -74,7 +73,8 @@ logging_cli_opts = [ cfg.StrOpt('log-format', default=None, metavar='FORMAT', - help='A logging.Formatter log message format string which may ' + help='DEPRECATED. ' + 'A logging.Formatter log message format string which may ' 'use any of the available logging.LogRecord attributes. ' 'This option is deprecated. Please use ' 'logging_context_format_string and ' @@ -347,7 +347,7 @@ class LogConfigError(Exception): def _load_log_config(log_config): try: logging.config.fileConfig(log_config) - except ConfigParser.Error as exc: + except moves.configparser.Error as exc: raise LogConfigError(log_config, str(exc)) @@ -520,7 +520,7 @@ class ContextFormatter(logging.Formatter): if not record: return logging.Formatter.formatException(self, exc_info) - stringbuffer = cStringIO.StringIO() + stringbuffer = moves.StringIO() traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], None, stringbuffer) lines = stringbuffer.getvalue().split('\n') diff --git a/muranoapi/openstack/common/loopingcall.py b/muranoapi/openstack/common/loopingcall.py index c1b39b18..ded7a3a4 100644 --- a/muranoapi/openstack/common/loopingcall.py +++ b/muranoapi/openstack/common/loopingcall.py @@ -22,7 +22,7 @@ import sys from eventlet import event from eventlet import greenthread -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import timeutils diff --git a/muranoapi/openstack/common/network_utils.py b/muranoapi/openstack/common/network_utils.py index 88c313ee..dbed1ceb 100644 --- a/muranoapi/openstack/common/network_utils.py +++ b/muranoapi/openstack/common/network_utils.py @@ -19,10 +19,7 @@ Network-related utilities and helper functions. """ -from muranoapi.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) +import urlparse def parse_host_port(address, default_port=None): @@ -67,3 +64,18 @@ def parse_host_port(address, default_port=None): port = default_port return (host, None if port is None else int(port)) + + +def urlsplit(url, scheme='', allow_fragments=True): + """Parse a URL using urlparse.urlsplit(), splitting query and fragments. + This function papers over Python issue9374 when needed. + + The parameters are the same as urlparse.urlsplit. + """ + scheme, netloc, path, query, fragment = urlparse.urlsplit( + url, scheme, allow_fragments) + if allow_fragments and '#' in path: + path, fragment = path.split('#', 1) + if '?' in path: + path, query = path.split('?', 1) + return urlparse.SplitResult(scheme, netloc, path, query, fragment) diff --git a/muranoapi/openstack/common/notifier/api.py b/muranoapi/openstack/common/notifier/api.py index 21695713..33fcdbf2 100644 --- a/muranoapi/openstack/common/notifier/api.py +++ b/muranoapi/openstack/common/notifier/api.py @@ -13,12 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. +import socket import uuid from oslo.config import cfg from muranoapi.openstack.common import context -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import importutils from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common import log as logging @@ -35,7 +36,7 @@ notifier_opts = [ default='INFO', help='Default notification level for outgoing notifications'), cfg.StrOpt('default_publisher_id', - default='$host', + default=None, help='Default publisher_id for outgoing notifications'), ] @@ -74,7 +75,7 @@ def notify_decorator(name, fn): ctxt = context.get_context_from_function_and_args(fn, args, kwarg) notify(ctxt, - CONF.default_publisher_id, + CONF.default_publisher_id or socket.gethostname(), name, CONF.default_notification_level, body) @@ -84,7 +85,10 @@ def notify_decorator(name, fn): def publisher_id(service, host=None): if not host: - host = CONF.host + try: + host = CONF.host + except AttributeError: + host = CONF.default_publisher_id or socket.gethostname() return "%s.%s" % (service, host) @@ -153,29 +157,16 @@ def _get_drivers(): if _drivers is None: _drivers = {} for notification_driver in CONF.notification_driver: - add_driver(notification_driver) - + try: + driver = importutils.import_module(notification_driver) + _drivers[notification_driver] = driver + except ImportError: + LOG.exception(_("Failed to load notifier %s. " + "These notifications will not be sent.") % + notification_driver) return _drivers.values() -def add_driver(notification_driver): - """Add a notification driver at runtime.""" - # Make sure the driver list is initialized. - _get_drivers() - if isinstance(notification_driver, basestring): - # Load and add - try: - driver = importutils.import_module(notification_driver) - _drivers[notification_driver] = driver - except ImportError: - LOG.exception(_("Failed to load notifier %s. " - "These notifications will not be sent.") % - notification_driver) - else: - # Driver is already loaded; just add the object. - _drivers[notification_driver] = notification_driver - - def _reset_drivers(): """Used by unit tests to reset the drivers.""" global _drivers diff --git a/muranoapi/openstack/common/notifier/rabbit_notifier.py b/muranoapi/openstack/common/notifier/rabbit_notifier.py deleted file mode 100644 index 22ea1241..00000000 --- a/muranoapi/openstack/common/notifier/rabbit_notifier.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2012 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -from muranoapi.openstack.common.gettextutils import _ -from muranoapi.openstack.common import log as logging -from muranoapi.openstack.common.notifier import rpc_notifier - -LOG = logging.getLogger(__name__) - - -def notify(context, message): - """Deprecated in Grizzly. Please use rpc_notifier instead.""" - - LOG.deprecated(_("The rabbit_notifier is now deprecated." - " Please use rpc_notifier instead.")) - rpc_notifier.notify(context, message) diff --git a/muranoapi/openstack/common/notifier/rpc_notifier.py b/muranoapi/openstack/common/notifier/rpc_notifier.py index 26b4e4ec..66965a8e 100644 --- a/muranoapi/openstack/common/notifier/rpc_notifier.py +++ b/muranoapi/openstack/common/notifier/rpc_notifier.py @@ -16,7 +16,7 @@ from oslo.config import cfg from muranoapi.openstack.common import context as req_context -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import rpc diff --git a/muranoapi/openstack/common/notifier/rpc_notifier2.py b/muranoapi/openstack/common/notifier/rpc_notifier2.py index 0388b0a9..db532886 100644 --- a/muranoapi/openstack/common/notifier/rpc_notifier2.py +++ b/muranoapi/openstack/common/notifier/rpc_notifier2.py @@ -18,7 +18,7 @@ from oslo.config import cfg from muranoapi.openstack.common import context as req_context -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import rpc diff --git a/muranoapi/openstack/common/rpc/__init__.py b/muranoapi/openstack/common/rpc/__init__.py index 2f3b315d..7a1a85cb 100644 --- a/muranoapi/openstack/common/rpc/__init__.py +++ b/muranoapi/openstack/common/rpc/__init__.py @@ -29,7 +29,7 @@ import inspect from oslo.config import cfg -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import importutils from muranoapi.openstack.common import local from muranoapi.openstack.common import log as logging diff --git a/muranoapi/openstack/common/rpc/amqp.py b/muranoapi/openstack/common/rpc/amqp.py index 2b7d511b..d4b5ec84 100644 --- a/muranoapi/openstack/common/rpc/amqp.py +++ b/muranoapi/openstack/common/rpc/amqp.py @@ -34,14 +34,28 @@ from eventlet import greenpool from eventlet import pools from eventlet import queue from eventlet import semaphore +from oslo.config import cfg from muranoapi.openstack.common import excutils -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import local from muranoapi.openstack.common import log as logging from muranoapi.openstack.common.rpc import common as rpc_common +amqp_opts = [ + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), +] + +cfg.CONF.register_opts(amqp_opts) + UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) @@ -151,11 +165,13 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) - def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + def join_consumer_pool(self, callback, pool_name, topic, exchange_name, + ack_on_error=True): self.connection.join_consumer_pool(callback, pool_name, topic, - exchange_name) + exchange_name, + ack_on_error) def consume_in_thread(self): self.connection.consume_in_thread() @@ -219,12 +235,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, failure = rpc_common.serialize_remote_exception(failure, log_failure) - try: - msg = {'result': reply, 'failure': failure} - except TypeError: - msg = {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure} + msg = {'result': reply, 'failure': failure} if ending: msg['ending'] = True _add_unique_id(msg) diff --git a/muranoapi/openstack/common/rpc/common.py b/muranoapi/openstack/common/rpc/common.py index dab9a60f..eaf126c0 100644 --- a/muranoapi/openstack/common/rpc/common.py +++ b/muranoapi/openstack/common/rpc/common.py @@ -24,7 +24,7 @@ import traceback from oslo.config import cfg import six -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import importutils from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common import local @@ -74,14 +74,14 @@ _REMOTE_POSTFIX = '_Remote' class RPCException(Exception): - message = _("An unknown RPC related exception occurred.") + msg_fmt = _("An unknown RPC related exception occurred.") def __init__(self, message=None, **kwargs): self.kwargs = kwargs if not message: try: - message = self.message % kwargs + message = self.msg_fmt % kwargs except Exception: # kwargs doesn't match a variable in the message @@ -90,7 +90,7 @@ class RPCException(Exception): for name, value in kwargs.iteritems(): LOG.error("%s: %s" % (name, value)) # at least get the core message out if something happened - message = self.message + message = self.msg_fmt super(RPCException, self).__init__(message) @@ -104,7 +104,7 @@ class RemoteError(RPCException): contains all of the relevant info. """ - message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") + msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") def __init__(self, exc_type=None, value=None, traceback=None): self.exc_type = exc_type @@ -121,7 +121,7 @@ class Timeout(RPCException): This exception is raised if the rpc_response_timeout is reached while waiting for a response from the remote side. """ - message = _('Timeout while waiting on RPC response - ' + msg_fmt = _('Timeout while waiting on RPC response - ' 'topic: "%(topic)s", RPC method: "%(method)s" ' 'info: "%(info)s"') @@ -144,25 +144,25 @@ class Timeout(RPCException): class DuplicateMessageError(RPCException): - message = _("Found duplicate message(%(msg_id)s). Skipping it.") + msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.") class InvalidRPCConnectionReuse(RPCException): - message = _("Invalid reuse of an RPC connection.") + msg_fmt = _("Invalid reuse of an RPC connection.") class UnsupportedRpcVersion(RPCException): - message = _("Specified RPC version, %(version)s, not supported by " + msg_fmt = _("Specified RPC version, %(version)s, not supported by " "this endpoint.") class UnsupportedRpcEnvelopeVersion(RPCException): - message = _("Specified RPC envelope version, %(version)s, " + msg_fmt = _("Specified RPC envelope version, %(version)s, " "not supported by this endpoint.") class RpcVersionCapError(RPCException): - message = _("Specified RPC version cap, %(version_cap)s, is too low") + msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") class Connection(object): @@ -261,41 +261,20 @@ class Connection(object): def _safe_log(log_func, msg, msg_data): """Sanitizes the msg_data field before logging.""" - SANITIZE = {'set_admin_password': [('args', 'new_pass')], - 'run_instance': [('args', 'admin_password')], - 'route_message': [('args', 'message', 'args', 'method_info', - 'method_kwargs', 'password'), - ('args', 'message', 'args', 'method_info', - 'method_kwargs', 'admin_password')]} + SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass'] - has_method = 'method' in msg_data and msg_data['method'] in SANITIZE - has_context_token = '_context_auth_token' in msg_data - has_token = 'auth_token' in msg_data + def _fix_passwords(d): + """Sanitizes the password fields in the dictionary.""" + for k in d.iterkeys(): + if k.lower().find('password') != -1: + d[k] = '' + elif k.lower() in SANITIZE: + d[k] = '' + elif isinstance(d[k], dict): + _fix_passwords(d[k]) + return d - if not any([has_method, has_context_token, has_token]): - return log_func(msg, msg_data) - - msg_data = copy.deepcopy(msg_data) - - if has_method: - for arg in SANITIZE.get(msg_data['method'], []): - try: - d = msg_data - for elem in arg[:-1]: - d = d[elem] - d[arg[-1]] = '' - except KeyError as e: - LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'), - {'item': arg, - 'err': e}) - - if has_context_token: - msg_data['_context_auth_token'] = '' - - if has_token: - msg_data['auth_token'] = '' - - return log_func(msg, msg_data) + return log_func(msg, _fix_passwords(copy.deepcopy(msg_data))) def serialize_remote_exception(failure_info, log_failure=True): diff --git a/muranoapi/openstack/common/rpc/impl_kombu.py b/muranoapi/openstack/common/rpc/impl_kombu.py index f34c6ff5..d391b51f 100644 --- a/muranoapi/openstack/common/rpc/impl_kombu.py +++ b/muranoapi/openstack/common/rpc/impl_kombu.py @@ -18,7 +18,6 @@ import functools import itertools import socket import ssl -import sys import time import uuid @@ -30,15 +29,20 @@ import kombu.entity import kombu.messaging from oslo.config import cfg -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common import excutils +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import network_utils from muranoapi.openstack.common.rpc import amqp as rpc_amqp from muranoapi.openstack.common.rpc import common as rpc_common +from muranoapi.openstack.common import sslutils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', default='', - help='SSL version to use (valid only if SSL enabled)'), + help='SSL version to use (valid only if SSL enabled). ' + 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' + 'be available on some distributions' + ), cfg.StrOpt('kombu_ssl_keyfile', default='', help='SSL key file (valid only if SSL enabled)'), @@ -82,9 +86,6 @@ kombu_opts = [ default=0, help='maximum retries with trying to connect to RabbitMQ ' '(the default of 0 implies an infinite retry count)'), - cfg.BoolOpt('rabbit_durable_queues', - default=False, - help='use durable queues in RabbitMQ'), cfg.BoolOpt('rabbit_ha_queues', default=False, help='use H/A queues in RabbitMQ (x-ha-policy: all).' @@ -129,6 +130,7 @@ class ConsumerBase(object): self.tag = str(tag) self.kwargs = kwargs self.queue = None + self.ack_on_error = kwargs.get('ack_on_error', True) self.reconnect(channel) def reconnect(self, channel): @@ -138,6 +140,36 @@ class ConsumerBase(object): self.queue = kombu.entity.Queue(**self.kwargs) self.queue.declare() + def _callback_handler(self, message, callback): + """Call callback with deserialized message. + + Messages that are processed without exception are ack'ed. + + If the message processing generates an exception, it will be + ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed. + Rejection is better than waiting for the message to timeout. + Rejected messages are immediately requeued. + """ + + ack_msg = False + try: + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) + ack_msg = True + except Exception: + if self.ack_on_error: + ack_msg = True + LOG.exception(_("Failed to process message" + " ... skipping it.")) + else: + LOG.exception(_("Failed to process message" + " ... will requeue.")) + finally: + if ack_msg: + message.ack() + else: + message.reject() + def consume(self, *args, **kwargs): """Actually declare the consumer on the amqp channel. This will start the flow of messages from the queue. Using the @@ -150,8 +182,6 @@ class ConsumerBase(object): If kwargs['nowait'] is True, then this call will block until a message is read. - Messages will automatically be acked if the callback doesn't - raise an exception """ options = {'consumer_tag': self.tag} @@ -162,13 +192,7 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) - try: - msg = rpc_common.deserialize_msg(message.payload) - callback(msg) - except Exception: - LOG.exception(_("Failed to process message... skipping it.")) - finally: - message.ack() + self._callback_handler(message, callback) self.queue.consume(*args, callback=_callback, **options) @@ -233,9 +257,9 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options - options = {'durable': conf.rabbit_durable_queues, + options = {'durable': conf.amqp_durable_queues, 'queue_arguments': _get_queue_arguments(conf), - 'auto_delete': False, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) @@ -339,8 +363,8 @@ class TopicPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ - options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, + options = {'durable': conf.amqp_durable_queues, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = rpc_amqp.get_control_exchange(conf) @@ -370,7 +394,7 @@ class NotifyPublisher(TopicPublisher): """Publisher class for 'notify'.""" def __init__(self, conf, channel, topic, **kwargs): - self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) @@ -454,7 +478,8 @@ class Connection(object): # http://docs.python.org/library/ssl.html - ssl.wrap_socket if self.conf.kombu_ssl_version: - ssl_params['ssl_version'] = self.conf.kombu_ssl_version + ssl_params['ssl_version'] = sslutils.validate_ssl_version( + self.conf.kombu_ssl_version) if self.conf.kombu_ssl_keyfile: ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile if self.conf.kombu_ssl_certfile: @@ -537,13 +562,11 @@ class Connection(object): log_info.update(params) if self.max_retries and attempt == self.max_retries: - LOG.error(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) - # NOTE(comstud): Copied from original code. There's - # really no better recourse because if this was a queue we - # need to consume on, we have no way to consume anymore. - sys.exit(1) + msg = _('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info + LOG.error(msg) + raise rpc_common.RPCException(msg) if attempt == 1: sleep_time = self.interval_start or 1 @@ -635,8 +658,8 @@ class Connection(object): def _consume(): if info['do_consume']: - queues_head = self.consumers[:-1] - queues_tail = self.consumers[-1] + queues_head = self.consumers[:-1] # not fanout. + queues_tail = self.consumers[-1] # fanout for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) @@ -685,11 +708,12 @@ class Connection(object): self.declare_consumer(DirectConsumer, topic, callback) def declare_topic_consumer(self, topic, callback=None, queue_name=None, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, exchange_name=exchange_name, + ack_on_error=ack_on_error, ), topic, callback) @@ -724,6 +748,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() @@ -754,7 +779,7 @@ class Connection(object): self.declare_topic_consumer(topic, proxy_cb, pool_name) def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. @@ -775,6 +800,7 @@ class Connection(object): topic=topic, exchange_name=exchange_name, callback=callback_wrapper, + ack_on_error=ack_on_error, ) diff --git a/muranoapi/openstack/common/rpc/impl_qpid.py b/muranoapi/openstack/common/rpc/impl_qpid.py index fa73b37f..68526d36 100644 --- a/muranoapi/openstack/common/rpc/impl_qpid.py +++ b/muranoapi/openstack/common/rpc/impl_qpid.py @@ -24,7 +24,8 @@ import eventlet import greenlet from oslo.config import cfg -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common import excutils +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import importutils from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common import log as logging @@ -118,10 +119,17 @@ class ConsumerBase(object): self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) - self.reconnect(session) + self.connect(session) + + def connect(self, session): + """Declare the reciever on connect.""" + self._declare_receiver(session) def reconnect(self, session): """Re-declare the receiver after a qpid reconnect.""" + self._declare_receiver(session) + + def _declare_receiver(self, session): self.session = session self.receiver = session.receiver(self.address) self.receiver.capacity = 1 @@ -152,11 +160,15 @@ class ConsumerBase(object): except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: + # TODO(sandy): Need support for optional ack_on_error. self.session.acknowledge(message) def get_receiver(self): return self.receiver + def get_node_name(self): + return self.address.split(';')[0] + class DirectConsumer(ConsumerBase): """Queue/consumer class for 'direct'.""" @@ -169,11 +181,16 @@ class DirectConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + super(DirectConsumer, self).__init__( + session, callback, + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + { + "auto-delete": conf.amqp_auto_delete, + "exclusive": True, + "durable": conf.amqp_durable_queues, + }) class TopicConsumer(ConsumerBase): @@ -191,9 +208,14 @@ class TopicConsumer(ConsumerBase): """ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) - super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (exchange_name, topic), - {}, name or topic, {}) + super(TopicConsumer, self).__init__( + session, callback, + "%s/%s" % (exchange_name, topic), + {}, name or topic, + { + "auto-delete": conf.amqp_auto_delete, + "durable": conf.amqp_durable_queues, + }) class FanoutConsumer(ConsumerBase): @@ -206,6 +228,7 @@ class FanoutConsumer(ConsumerBase): 'topic' is the topic to listen on 'callback' is the callback to call when messages are received """ + self.conf = conf super(FanoutConsumer, self).__init__( session, callback, @@ -214,6 +237,18 @@ class FanoutConsumer(ConsumerBase): "%s_fanout_%s" % (topic, uuid.uuid4().hex), {"exclusive": True}) + def reconnect(self, session): + topic = self.get_node_name().rpartition('_fanout')[0] + params = { + 'session': session, + 'topic': topic, + 'callback': self.callback, + } + + self.__init__(conf=self.conf, **params) + + super(FanoutConsumer, self).reconnect(session) + class Publisher(object): """Base Publisher class.""" @@ -575,6 +610,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() @@ -615,7 +651,7 @@ class Connection(object): return consumer def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. diff --git a/muranoapi/openstack/common/rpc/impl_zmq.py b/muranoapi/openstack/common/rpc/impl_zmq.py index 535b1ae3..0d4b7c16 100644 --- a/muranoapi/openstack/common/rpc/impl_zmq.py +++ b/muranoapi/openstack/common/rpc/impl_zmq.py @@ -27,7 +27,7 @@ import greenlet from oslo.config import cfg from muranoapi.openstack.common import excutils -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import importutils from muranoapi.openstack.common import jsonutils from muranoapi.openstack.common.rpc import common as rpc_common @@ -358,7 +358,6 @@ class ZmqBaseReactor(ConsumerBase): def __init__(self, conf): super(ZmqBaseReactor, self).__init__() - self.mapping = {} self.proxies = {} self.threads = [] self.sockets = [] @@ -366,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase): self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) - def register(self, proxy, in_addr, zmq_type_in, out_addr=None, - zmq_type_out=None, in_bind=True, out_bind=True, - subscribe=None): + def register(self, proxy, in_addr, zmq_type_in, + in_bind=True, subscribe=None): LOG.info(_("Registering reactor")) @@ -384,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase): LOG.info(_("In reactor registered")) - if not out_addr: - return - - if zmq_type_out not in (zmq.PUSH, zmq.PUB): - raise RPCException("Bad output socktype") - - # Items push out. - outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind) - - self.mapping[inq] = outq - self.mapping[outq] = inq - self.sockets.append(outq) - - LOG.info(_("Out reactor registered")) - def consume_in_thread(self): def _consume(sock): LOG.info(_("Consuming socket")) @@ -516,8 +499,7 @@ class ZmqProxy(ZmqBaseReactor): try: self.register(consumption_proxy, consume_in, - zmq.PULL, - out_bind=True) + zmq.PULL) except zmq.ZMQError: if os.access(ipc_dir, os.X_OK): with excutils.save_and_reraise_exception(): @@ -559,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) - if sock in self.mapping: - LOG.debug(_("ROUTER RELAY-OUT %(data)s") % { - 'data': data}) - self.mapping[sock].send(data) - return proxy = self.proxies[sock] diff --git a/muranoapi/openstack/common/rpc/matchmaker.py b/muranoapi/openstack/common/rpc/matchmaker.py index f10a1726..50f11c5c 100644 --- a/muranoapi/openstack/common/rpc/matchmaker.py +++ b/muranoapi/openstack/common/rpc/matchmaker.py @@ -23,7 +23,7 @@ import contextlib import eventlet from oslo.config import cfg -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import log as logging diff --git a/muranoapi/openstack/common/rpc/matchmaker_ring.py b/muranoapi/openstack/common/rpc/matchmaker_ring.py index 4117c4b7..4dfd5e77 100644 --- a/muranoapi/openstack/common/rpc/matchmaker_ring.py +++ b/muranoapi/openstack/common/rpc/matchmaker_ring.py @@ -23,7 +23,7 @@ import json from oslo.config import cfg -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import log as logging from muranoapi.openstack.common.rpc import matchmaker as mm diff --git a/muranoapi/openstack/common/rpc/proxy.py b/muranoapi/openstack/common/rpc/proxy.py index 3aa79ca8..3998de17 100644 --- a/muranoapi/openstack/common/rpc/proxy.py +++ b/muranoapi/openstack/common/rpc/proxy.py @@ -69,7 +69,7 @@ class RpcProxy(object): v = vers if vers else self.default_version if (self.version_cap and not rpc_common.version_is_compatible(self.version_cap, v)): - raise rpc_common.RpcVersionCapError(version=self.version_cap) + raise rpc_common.RpcVersionCapError(version_cap=self.version_cap) msg['version'] = v def _get_topic(self, topic): diff --git a/muranoapi/openstack/common/rpc/service.py b/muranoapi/openstack/common/rpc/service.py index 44fe5454..d35e77ad 100644 --- a/muranoapi/openstack/common/rpc/service.py +++ b/muranoapi/openstack/common/rpc/service.py @@ -17,7 +17,7 @@ # License for the specific language governing permissions and limitations # under the License. -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import rpc from muranoapi.openstack.common.rpc import dispatcher as rpc_dispatcher @@ -32,10 +32,11 @@ class Service(service.Service): A service enables rpc by listening to queues based on topic and host. """ - def __init__(self, host, topic, manager=None): + def __init__(self, host, topic, manager=None, serializer=None): super(Service, self).__init__() self.host = host self.topic = topic + self.serializer = serializer if manager is None: self.manager = self else: @@ -48,7 +49,8 @@ class Service(service.Service): LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) - dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], + self.serializer) # Share this same connection for these Consumers self.conn.create_consumer(self.topic, dispatcher, fanout=False) diff --git a/muranoapi/openstack/common/service.py b/muranoapi/openstack/common/service.py index 79b2718c..49e7a9e3 100644 --- a/muranoapi/openstack/common/service.py +++ b/muranoapi/openstack/common/service.py @@ -27,11 +27,12 @@ import sys import time import eventlet +from eventlet import event import logging as std_logging from oslo.config import cfg from muranoapi.openstack.common import eventlet_backdoor -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa from muranoapi.openstack.common import importutils from muranoapi.openstack.common import log as logging from muranoapi.openstack.common import threadgroup @@ -51,20 +52,9 @@ class Launcher(object): :returns: None """ - self._services = threadgroup.ThreadGroup() + self.services = Services() self.backdoor_port = eventlet_backdoor.initialize_if_enabled() - @staticmethod - def run_service(service): - """Start and wait for a service to finish. - - :param service: service to run and wait for. - :returns: None - - """ - service.start() - service.wait() - def launch_service(self, service): """Load and start the given service. @@ -73,7 +63,7 @@ class Launcher(object): """ service.backdoor_port = self.backdoor_port - self._services.add_thread(self.run_service, service) + self.services.add(service) def stop(self): """Stop all services which are currently running. @@ -81,7 +71,7 @@ class Launcher(object): :returns: None """ - self._services.stop() + self.services.stop() def wait(self): """Waits until all services have been stopped, and then returns. @@ -89,7 +79,7 @@ class Launcher(object): :returns: None """ - self._services.wait() + self.services.wait() class SignalExit(SystemExit): @@ -124,9 +114,13 @@ class ServiceLauncher(Launcher): except SystemExit as exc: status = exc.code finally: - if rpc: - rpc.cleanup() self.stop() + if rpc: + try: + rpc.cleanup() + except Exception: + # We're shutting down, so it doesn't matter at this point. + LOG.exception(_('Exception during rpc cleanup.')) return status @@ -189,7 +183,8 @@ class ProcessLauncher(object): random.seed() launcher = Launcher() - launcher.run_service(service) + launcher.launch_service(service) + launcher.wait() def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: @@ -313,15 +308,63 @@ class Service(object): def __init__(self, threads=1000): self.tg = threadgroup.ThreadGroup(threads) + # signal that the service is done shutting itself down: + self._done = event.Event() + def start(self): pass def stop(self): self.tg.stop() + self.tg.wait() + # Signal that service cleanup is done: + if not self._done.ready(): + self._done.send() + + def wait(self): + self._done.wait() + + +class Services(object): + + def __init__(self): + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = event.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + # wait for graceful shutdown of services: + for service in self.services: + service.stop() + service.wait() + + # Each service has performed cleanup, now signal that the run_service + # wrapper threads can now die: + if not self.done.ready(): + self.done.send() + + # reap threads: + self.tg.stop() def wait(self): self.tg.wait() + @staticmethod + def run_service(service, done): + """Service start wrapper. + + :param service: service to run + :param done: event to wait on until a shutdown is triggered + :returns: None + + """ + service.start() + done.wait() + def launch(service, workers=None): if workers: diff --git a/muranoapi/openstack/common/setup.py b/muranoapi/openstack/common/setup.py deleted file mode 100644 index dec74fd0..00000000 --- a/muranoapi/openstack/common/setup.py +++ /dev/null @@ -1,367 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 OpenStack Foundation. -# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Utilities with minimum-depends for use in setup.py -""" - -import email -import os -import re -import subprocess -import sys - -from setuptools.command import sdist - - -def parse_mailmap(mailmap='.mailmap'): - mapping = {} - if os.path.exists(mailmap): - with open(mailmap, 'r') as fp: - for l in fp: - try: - canonical_email, alias = re.match( - r'[^#]*?(<.+>).*(<.+>).*', l).groups() - except AttributeError: - continue - mapping[alias] = canonical_email - return mapping - - -def _parse_git_mailmap(git_dir, mailmap='.mailmap'): - mailmap = os.path.join(os.path.dirname(git_dir), mailmap) - return parse_mailmap(mailmap) - - -def canonicalize_emails(changelog, mapping): - """Takes in a string and an email alias mapping and replaces all - instances of the aliases in the string with their real email. - """ - for alias, email_address in mapping.iteritems(): - changelog = changelog.replace(alias, email_address) - return changelog - - -# Get requirements from the first file that exists -def get_reqs_from_files(requirements_files): - for requirements_file in requirements_files: - if os.path.exists(requirements_file): - with open(requirements_file, 'r') as fil: - return fil.read().split('\n') - return [] - - -def parse_requirements(requirements_files=['requirements.txt', - 'tools/pip-requires']): - requirements = [] - for line in get_reqs_from_files(requirements_files): - # For the requirements list, we need to inject only the portion - # after egg= so that distutils knows the package it's looking for - # such as: - # -e git://github.com/openstack/nova/master#egg=nova - if re.match(r'\s*-e\s+', line): - requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1', - line)) - # such as: - # http://github.com/openstack/nova/zipball/master#egg=nova - elif re.match(r'\s*https?:', line): - requirements.append(re.sub(r'\s*https?:.*#egg=(.*)$', r'\1', - line)) - # -f lines are for index locations, and don't get used here - elif re.match(r'\s*-f\s+', line): - pass - # argparse is part of the standard library starting with 2.7 - # adding it to the requirements list screws distro installs - elif line == 'argparse' and sys.version_info >= (2, 7): - pass - else: - requirements.append(line) - - return requirements - - -def parse_dependency_links(requirements_files=['requirements.txt', - 'tools/pip-requires']): - dependency_links = [] - # dependency_links inject alternate locations to find packages listed - # in requirements - for line in get_reqs_from_files(requirements_files): - # skip comments and blank lines - if re.match(r'(\s*#)|(\s*$)', line): - continue - # lines with -e or -f need the whole line, minus the flag - if re.match(r'\s*-[ef]\s+', line): - dependency_links.append(re.sub(r'\s*-[ef]\s+', '', line)) - # lines that are only urls can go in unmolested - elif re.match(r'\s*https?:', line): - dependency_links.append(line) - return dependency_links - - -def _run_shell_command(cmd, throw_on_error=False): - if os.name == 'nt': - output = subprocess.Popen(["cmd.exe", "/C", cmd], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - else: - output = subprocess.Popen(["/bin/sh", "-c", cmd], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out = output.communicate() - if output.returncode and throw_on_error: - raise Exception("%s returned %d" % cmd, output.returncode) - if len(out) == 0: - return None - if len(out[0].strip()) == 0: - return None - return out[0].strip() - - -def _get_git_directory(): - parent_dir = os.path.dirname(__file__) - while True: - git_dir = os.path.join(parent_dir, '.git') - if os.path.exists(git_dir): - return git_dir - parent_dir, child = os.path.split(parent_dir) - if not child: # reached to root dir - return None - - -def write_git_changelog(): - """Write a changelog based on the git changelog.""" - new_changelog = 'ChangeLog' - git_dir = _get_git_directory() - if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'): - if git_dir: - git_log_cmd = 'git --git-dir=%s log' % git_dir - changelog = _run_shell_command(git_log_cmd) - mailmap = _parse_git_mailmap(git_dir) - with open(new_changelog, "w") as changelog_file: - changelog_file.write(canonicalize_emails(changelog, mailmap)) - else: - open(new_changelog, 'w').close() - - -def generate_authors(): - """Create AUTHORS file using git commits.""" - jenkins_email = 'jenkins@review.(openstack|stackforge).org' - old_authors = 'AUTHORS.in' - new_authors = 'AUTHORS' - git_dir = _get_git_directory() - if not os.getenv('SKIP_GENERATE_AUTHORS'): - if git_dir: - # don't include jenkins email address in AUTHORS file - git_log_cmd = ("git --git-dir=" + git_dir + - " log --format='%aN <%aE>' | sort -u | " - "egrep -v '" + jenkins_email + "'") - changelog = _run_shell_command(git_log_cmd) - signed_cmd = ("git log --git-dir=" + git_dir + - " | grep -i Co-authored-by: | sort -u") - signed_entries = _run_shell_command(signed_cmd) - if signed_entries: - new_entries = "\n".join( - [signed.split(":", 1)[1].strip() - for signed in signed_entries.split("\n") if signed]) - changelog = "\n".join((changelog, new_entries)) - mailmap = _parse_git_mailmap(git_dir) - with open(new_authors, 'w') as new_authors_fh: - new_authors_fh.write(canonicalize_emails(changelog, mailmap)) - if os.path.exists(old_authors): - with open(old_authors, "r") as old_authors_fh: - new_authors_fh.write('\n' + old_authors_fh.read()) - else: - open(new_authors, 'w').close() - - -_rst_template = """%(heading)s -%(underline)s - -.. automodule:: %(module)s - :members: - :undoc-members: - :show-inheritance: -""" - - -def get_cmdclass(): - """Return dict of commands to run from setup.py.""" - - cmdclass = dict() - - def _find_modules(arg, dirname, files): - for filename in files: - if filename.endswith('.py') and filename != '__init__.py': - arg["%s.%s" % (dirname.replace('/', '.'), - filename[:-3])] = True - - class LocalSDist(sdist.sdist): - """Builds the ChangeLog and Authors files from VC first.""" - - def run(self): - write_git_changelog() - generate_authors() - # sdist.sdist is an old style class, can't use super() - sdist.sdist.run(self) - - cmdclass['sdist'] = LocalSDist - - # If Sphinx is installed on the box running setup.py, - # enable setup.py to build the documentation, otherwise, - # just ignore it - try: - from sphinx.setup_command import BuildDoc - - class LocalBuildDoc(BuildDoc): - - builders = ['html', 'man'] - - def generate_autoindex(self): - print "**Autodocumenting from %s" % os.path.abspath(os.curdir) - modules = {} - option_dict = self.distribution.get_option_dict('build_sphinx') - source_dir = os.path.join(option_dict['source_dir'][1], 'api') - if not os.path.exists(source_dir): - os.makedirs(source_dir) - for pkg in self.distribution.packages: - if '.' not in pkg: - os.path.walk(pkg, _find_modules, modules) - module_list = modules.keys() - module_list.sort() - autoindex_filename = os.path.join(source_dir, 'autoindex.rst') - with open(autoindex_filename, 'w') as autoindex: - autoindex.write(""".. toctree:: - :maxdepth: 1 - -""") - for module in module_list: - output_filename = os.path.join(source_dir, - "%s.rst" % module) - heading = "The :mod:`%s` Module" % module - underline = "=" * len(heading) - values = dict(module=module, heading=heading, - underline=underline) - - print "Generating %s" % output_filename - with open(output_filename, 'w') as output_file: - output_file.write(_rst_template % values) - autoindex.write(" %s.rst\n" % module) - - def run(self): - if not os.getenv('SPHINX_DEBUG'): - self.generate_autoindex() - - for builder in self.builders: - self.builder = builder - self.finalize_options() - self.project = self.distribution.get_name() - self.version = self.distribution.get_version() - self.release = self.distribution.get_version() - BuildDoc.run(self) - - class LocalBuildLatex(LocalBuildDoc): - builders = ['latex'] - - cmdclass['build_sphinx'] = LocalBuildDoc - cmdclass['build_sphinx_latex'] = LocalBuildLatex - except ImportError: - pass - - return cmdclass - - -def _get_revno(git_dir): - """Return the number of commits since the most recent tag. - - We use git-describe to find this out, but if there are no - tags then we fall back to counting commits since the beginning - of time. - """ - describe = _run_shell_command( - "git --git-dir=%s describe --always" % git_dir) - if "-" in describe: - return describe.rsplit("-", 2)[-2] - - # no tags found - revlist = _run_shell_command( - "git --git-dir=%s rev-list --abbrev-commit HEAD" % git_dir) - return len(revlist.splitlines()) - - -def _get_version_from_git(pre_version): - """Return a version which is equal to the tag that's on the current - revision if there is one, or tag plus number of additional revisions - if the current revision has no tag.""" - - git_dir = _get_git_directory() - if git_dir: - if pre_version: - try: - return _run_shell_command( - "git --git-dir=" + git_dir + " describe --exact-match", - throw_on_error=True).replace('-', '.') - except Exception: - sha = _run_shell_command( - "git --git-dir=" + git_dir + " log -n1 --pretty=format:%h") - return "%s.a%s.g%s" % (pre_version, _get_revno(git_dir), sha) - else: - return _run_shell_command( - "git --git-dir=" + git_dir + " describe --always").replace( - '-', '.') - return None - - -def _get_version_from_pkg_info(package_name): - """Get the version from PKG-INFO file if we can.""" - try: - pkg_info_file = open('PKG-INFO', 'r') - except (IOError, OSError): - return None - try: - pkg_info = email.message_from_file(pkg_info_file) - except email.MessageError: - return None - # Check to make sure we're in our own dir - if pkg_info.get('Name', None) != package_name: - return None - return pkg_info.get('Version', None) - - -def get_version(package_name, pre_version=None): - """Get the version of the project. First, try getting it from PKG-INFO, if - it exists. If it does, that means we're in a distribution tarball or that - install has happened. Otherwise, if there is no PKG-INFO file, pull the - version from git. - - We do not support setup.py version sanity in git archive tarballs, nor do - we support packagers directly sucking our git repo into theirs. We expect - that a source tarball be made from our git repo - or that if someone wants - to make a source tarball from a fork of our repo with additional tags in it - that they understand and desire the results of doing that. - """ - version = os.environ.get("OSLO_PACKAGE_VERSION", None) - if version: - return version - version = _get_version_from_pkg_info(package_name) - if version: - return version - version = _get_version_from_git(pre_version) - if version: - return version - raise Exception("Versioning for this project requires either an sdist" - " tarball, or access to an upstream git repository.") diff --git a/muranoapi/openstack/common/sslutils.py b/muranoapi/openstack/common/sslutils.py index dc719d9b..5d19a648 100644 --- a/muranoapi/openstack/common/sslutils.py +++ b/muranoapi/openstack/common/sslutils.py @@ -19,7 +19,7 @@ import ssl from oslo.config import cfg -from muranoapi.openstack.common.gettextutils import _ +from muranoapi.openstack.common.gettextutils import _ # noqa ssl_opts = [ @@ -78,3 +78,23 @@ def wrap(sock): ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED return ssl.wrap_socket(sock, **ssl_kwargs) + + +_SSL_PROTOCOLS = { + "tlsv1": ssl.PROTOCOL_TLSv1, + "sslv23": ssl.PROTOCOL_SSLv23, + "sslv3": ssl.PROTOCOL_SSLv3 +} + +try: + _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 +except AttributeError: + pass + + +def validate_ssl_version(version): + key = version.lower() + try: + return _SSL_PROTOCOLS[key] + except KeyError: + raise RuntimeError(_("Invalid SSL version : %s") % version) diff --git a/muranoapi/openstack/common/threadgroup.py b/muranoapi/openstack/common/threadgroup.py index 2e9fe048..3f474921 100644 --- a/muranoapi/openstack/common/threadgroup.py +++ b/muranoapi/openstack/common/threadgroup.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -from eventlet import greenlet +import eventlet from eventlet import greenpool from eventlet import greenthread @@ -105,7 +105,7 @@ class ThreadGroup(object): for x in self.timers: try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) @@ -115,7 +115,7 @@ class ThreadGroup(object): continue try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) diff --git a/muranoapi/openstack/common/timeutils.py b/muranoapi/openstack/common/timeutils.py index ac2441bc..bd60489e 100644 --- a/muranoapi/openstack/common/timeutils.py +++ b/muranoapi/openstack/common/timeutils.py @@ -23,6 +23,7 @@ import calendar import datetime import iso8601 +import six # ISO 8601 extended time format with microseconds @@ -75,14 +76,14 @@ def normalize_time(timestamp): def is_older_than(before, seconds): """Return True if before is older than seconds.""" - if isinstance(before, basestring): + if isinstance(before, six.string_types): before = parse_strtime(before).replace(tzinfo=None) return utcnow() - before > datetime.timedelta(seconds=seconds) def is_newer_than(after, seconds): """Return True if after is newer than seconds.""" - if isinstance(after, basestring): + if isinstance(after, six.string_types): after = parse_strtime(after).replace(tzinfo=None) return after - utcnow() > datetime.timedelta(seconds=seconds) diff --git a/muranoapi/openstack/common/version.py b/muranoapi/openstack/common/version.py deleted file mode 100644 index 010a6571..00000000 --- a/muranoapi/openstack/common/version.py +++ /dev/null @@ -1,94 +0,0 @@ - -# Copyright 2012 OpenStack Foundation -# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Utilities for consuming the version from pkg_resources. -""" - -import pkg_resources - - -class VersionInfo(object): - - def __init__(self, package): - """Object that understands versioning for a package - :param package: name of the python package, such as glance, or - python-glanceclient - """ - self.package = package - self.release = None - self.version = None - self._cached_version = None - - def __str__(self): - """Make the VersionInfo object behave like a string.""" - return self.version_string() - - def __repr__(self): - """Include the name.""" - return "VersionInfo(%s:%s)" % (self.package, self.version_string()) - - def _get_version_from_pkg_resources(self): - """Get the version of the package from the pkg_resources record - associated with the package.""" - try: - requirement = pkg_resources.Requirement.parse(self.package) - provider = pkg_resources.get_provider(requirement) - return provider.version - except pkg_resources.DistributionNotFound: - # The most likely cause for this is running tests in a tree - # produced from a tarball where the package itself has not been - # installed into anything. Revert to setup-time logic. - from muranoapi.openstack.common import setup - return setup.get_version(self.package) - - def release_string(self): - """Return the full version of the package including suffixes indicating - VCS status. - """ - if self.release is None: - self.release = self._get_version_from_pkg_resources() - - return self.release - - def version_string(self): - """Return the short version minus any alpha/beta tags.""" - if self.version is None: - parts = [] - for part in self.release_string().split('.'): - if part[0].isdigit(): - parts.append(part) - else: - break - self.version = ".".join(parts) - - return self.version - - # Compatibility functions - canonical_version_string = version_string - version_string_with_vcs = release_string - - def cached_version_string(self, prefix=""): - """Generate an object which will expand in a string context to - the results of version_string(). We do this so that don't - call into pkg_resources every time we start up a program when - passing version information into the CONF constructor, but - rather only do the calculation when and if a version is requested - """ - if not self._cached_version: - self._cached_version = "%s%s" % (prefix, - self.version_string()) - return self._cached_version diff --git a/openstack-common.conf b/openstack-common.conf index 5f25b8ec..8f88b85a 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,6 @@ [DEFAULT] # The list of modules to copy from openstack-common -module=setup module=wsgi module=config module=exception @@ -15,7 +14,6 @@ module=service module=notifier module=local module=install_venv_common -module=version module=timeutils module=eventlet_backdoor module=threadgroup diff --git a/tools/pip-requires b/requirements.txt similarity index 78% rename from tools/pip-requires rename to requirements.txt index f79b4904..e887e969 100644 --- a/tools/pip-requires +++ b/requirements.txt @@ -1,3 +1,5 @@ +d2to1>=0.2.10,<0.3 +pbr>=0.5,<0.6 Babel SQLAlchemy>=0.7,<=0.7.9 anyjson @@ -13,8 +15,8 @@ httplib2 kombu pycrypto>=2.1.0alpha1 iso8601>=0.1.4 -amqplib six +netaddr # Note you will need gcc buildtools installed and must # have installed libxml headers for lxml to be successfully @@ -28,5 +30,5 @@ Paste passlib jsonschema==2.0.0 python-keystoneclient>=0.2.0 - oslo.config +http://github.com/sergmelikyan/murano-common/releases/download/0.1/muranocommon-0.1.tar.gz#egg=muranocommon-0.1 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 9449f4af..ee8f39de 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,48 @@ +# Copyright (c) 2013 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +[metadata] +name = muranoapi +summary = Murano API +description-file = + README.rst +license = Apache License, Version 2.0 +author = Mirantis, Inc. +author-email = murano-all@lists.openstack.org +home-page = htts://launchpad.net/murano +classifier = + Development Status :: 5 - Production/Stable + Environment :: OpenStack + Intended Audience :: Developers + Intended Audience :: Information Technology + License :: OSI Approved :: Apache Software License + Operating System :: OS Independent + Programming Language :: Python + +[files] +packages = + muranoapi + +[global] +setup-hooks = + pbr.hooks.setup_hook + +[entry_points] +console_scripts = + murano-api = muranoapi.cmd.api:main + [build_sphinx] all_files = 1 build-dir = doc/build @@ -20,14 +65,4 @@ input_file = muranoapi/locale/muranoapi.pot [extract_messages] keywords = _ gettext ngettext l_ lazy_gettext mapping_file = babel.cfg -output_file = muranoapi/locale/muranoapi.pot - -[nosetests] -# NOTE(jkoelker) To run the test suite under nose install the following -# coverage http://pypi.python.org/pypi/coverage -# tissue http://pypi.python.org/pypi/tissue (pep8 checker) -# openstack-nose https://github.com/jkoelker/openstack-nose -verbosity=2 -cover-package = muranoapi -cover-html = true -cover-erase = true \ No newline at end of file +output_file = muranoapi/locale/muranoapi.pot \ No newline at end of file diff --git a/setup.py b/setup.py index 194e3522..07fba76d 100644 --- a/setup.py +++ b/setup.py @@ -16,34 +16,11 @@ import setuptools -from muranoapi.openstack.common import setup - -requires = setup.parse_requirements() -depend_links = setup.parse_dependency_links() -project = 'muranoapi' setuptools.setup( - name=project, - version=setup.get_version(project, '2013.1'), - description='The Murano Project API', - license='Apache License (2.0)', - author='Mirantis, Inc', - author_email='smelikyan@mirantis.com', - url='http://muranoapi.mirantis.com/', - packages=setuptools.find_packages(exclude=['bin']), - test_suite='nose.collector', - cmdclass=setup.get_cmdclass(), - include_package_data=True, - install_requires=requires, - dependency_links=depend_links, - classifiers=[ - 'Development Status :: 4 - Beta', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 2.7', - 'Environment :: No Input/Output (Daemon)', - 'Environment :: OpenStack', + setup_requires=[ + 'd2to1>=0.2.10,<0.3', + 'pbr>=0.5,<0.6' ], - scripts=['bin/murano-api'], - py_modules=[] + d2to1=True, ) diff --git a/tools/test-requires b/test-requirements.txt similarity index 94% rename from tools/test-requires rename to test-requirements.txt index d89a17c5..29ea7c7e 100644 --- a/tools/test-requires +++ b/test-requirements.txt @@ -18,4 +18,4 @@ mock # Optional packages that should be installed when testing xattr>=0.6.0 -pysendfile==2.0.0 \ No newline at end of file +pysendfile==2.0.0 diff --git a/tools/config/generate_sample.sh b/tools/config/generate_sample.sh new file mode 100755 index 00000000..4db5e42d --- /dev/null +++ b/tools/config/generate_sample.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +print_hint() { + echo "Try \`${0##*/} --help' for more information." >&2 +} + +PARSED_OPTIONS=$(getopt -n "${0##*/}" -o hb:p:o: \ + --long help,base-dir:,package-name:,output-dir: -- "$@") + +if [ $? != 0 ] ; then print_hint ; exit 1 ; fi + +eval set -- "$PARSED_OPTIONS" + +while true; do + case "$1" in + -h|--help) + echo "${0##*/} [options]" + echo "" + echo "options:" + echo "-h, --help show brief help" + echo "-b, --base-dir=DIR Project base directory (required)" + echo "-p, --package-name=NAME Project package name" + echo "-o, --output-dir=DIR File output directory" + exit 0 + ;; + -b|--base-dir) + shift + BASEDIR=`echo $1 | sed -e 's/\/*$//g'` + shift + ;; + -p|--package-name) + shift + PACKAGENAME=`echo $1` + shift + ;; + -o|--output-dir) + shift + OUTPUTDIR=`echo $1 | sed -e 's/\/*$//g'` + shift + ;; + --) + break + ;; + esac +done + +if [ -z $BASEDIR ] || ! [ -d $BASEDIR ] +then + echo "${0##*/}: missing project base directory" >&2 ; print_hint ; exit 1 +fi + +PACKAGENAME=${PACKAGENAME:-${BASEDIR##*/}} + +OUTPUTDIR=${OUTPUTDIR:-$BASEDIR/etc} +if ! [ -d $OUTPUTDIR ] +then + echo "${0##*/}: cannot access \`$OUTPUTDIR': No such file or directory" >&2 + exit 1 +fi + +BASEDIRESC=`echo $BASEDIR | sed -e 's/\//\\\\\//g'` +FILES=$(find $BASEDIR/$PACKAGENAME -type f -name "*.py" ! -path "*/tests/*" \ + -exec grep -l "Opt(" {} + | sed -e "s/^$BASEDIRESC\///g" | sort -u) + +export EVENTLET_NO_GREENDNS=yes + +MODULEPATH=muranoapi.openstack.common.config.generator +OUTPUTFILE=$OUTPUTDIR/$PACKAGENAME.conf.sample +python -m $MODULEPATH $FILES > $OUTPUTFILE diff --git a/tools/install_venv.py b/tools/install_venv.py index 8a498920..0011a8be 100644 --- a/tools/install_venv.py +++ b/tools/install_venv.py @@ -4,72 +4,74 @@ # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # -# Copyright 2010 OpenStack LLC. +# Copyright 2010 OpenStack Foundation # Copyright 2013 IBM Corp. +# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. # -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Installation script for Murano API's development virtualenv -""" +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import ConfigParser import os -import subprocess import sys -import install_venv_common as install_venv +import install_venv_common as install_venv # flake8: noqa -def print_help(): +def print_help(project, venv, root): help = """ - Murano API development environment setup is complete. + %(project)s development environment setup is complete. - Murano API development uses virtualenv to track and manage Python dependencies - while in development and testing. + %(project)s development uses virtualenv to track and manage Python + dependencies while in development and testing. - To activate the Murano API virtualenv for the extent of your current shell session - you can run: + To activate the %(project)s virtualenv for the extent of your current + shell session you can run: - $ source .venv/bin/activate + $ source %(venv)s/bin/activate - Or, if you prefer, you can run commands in the virtualenv on a case by case - basis by running: + Or, if you prefer, you can run commands in the virtualenv on a case by + case basis by running: - $ tools/with_venv.sh - - Also, make test will automatically use the virtualenv. + $ %(root)s/tools/with_venv.sh """ - print help + print help % dict(project=project, venv=venv, root=root) def main(argv): root = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + + if os.environ.get('tools_path'): + root = os.environ['tools_path'] venv = os.path.join(root, '.venv') - pip_requires = os.path.join(root, 'tools', 'pip-requires') - test_requires = os.path.join(root, 'tools', 'test-requires') + if os.environ.get('venv'): + venv = os.environ['venv'] + + pip_requires = os.path.join(root, 'requirements.txt') + test_requires = os.path.join(root, 'test-requirements.txt') py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1]) - project = 'muranoapi' - install = install_venv.InstallVenv(root, venv, pip_requires, test_requires, - py_version, project) + setup_cfg = ConfigParser.ConfigParser() + setup_cfg.read('setup.cfg') + project = setup_cfg.get('metadata', 'name') + + install = install_venv.InstallVenv( + root, venv, pip_requires, test_requires, py_version, project) options = install.parse_args(argv) install.check_python_version() install.check_dependencies() install.create_virtualenv(no_site_packages=options.no_site_packages) install.install_dependencies() - install.run_command([os.path.join(venv, 'bin/python'), - 'setup.py', 'develop']) install.post_process() - print_help() + print_help(project, venv, root) if __name__ == '__main__': main(sys.argv) diff --git a/tools/install_venv_common.py b/tools/install_venv_common.py index 42a44e8c..f428c1e0 100644 --- a/tools/install_venv_common.py +++ b/tools/install_venv_common.py @@ -34,12 +34,13 @@ import sys class InstallVenv(object): - def __init__(self, root, venv, pip_requires, test_requires, py_version, + def __init__(self, root, venv, requirements, + test_requirements, py_version, project): self.root = root self.venv = venv - self.pip_requires = pip_requires - self.test_requires = test_requires + self.requirements = requirements + self.test_requirements = test_requirements self.py_version = py_version self.project = project @@ -75,11 +76,13 @@ class InstallVenv(object): def get_distro(self): if (os.path.exists('/etc/fedora-release') or os.path.exists('/etc/redhat-release')): - return Fedora(self.root, self.venv, self.pip_requires, - self.test_requires, self.py_version, self.project) + return Fedora( + self.root, self.venv, self.requirements, + self.test_requirements, self.py_version, self.project) else: - return Distro(self.root, self.venv, self.pip_requires, - self.test_requires, self.py_version, self.project) + return Distro( + self.root, self.venv, self.requirements, + self.test_requirements, self.py_version, self.project) def check_dependencies(self): self.get_distro().install_virtualenv() @@ -98,11 +101,6 @@ class InstallVenv(object): else: self.run_command(['virtualenv', '-q', self.venv]) print('done.') - print('Installing pip in venv...', end=' ') - if not self.run_command(['tools/with_venv.sh', 'easy_install', - 'pip>1.0']).strip(): - self.die("Failed to install pip.") - print('done.') else: print("venv already exists...") pass @@ -116,20 +114,12 @@ class InstallVenv(object): print('Installing dependencies with pip (this can take a while)...') # First things first, make sure our venv has the latest pip and - # distribute. - # NOTE: we keep pip at version 1.1 since the most recent version causes - # the .venv creation to fail. See: - # https://bugs.launchpad.net/nova/+bug/1047120 - self.pip_install('pip==1.1') - self.pip_install('distribute') + # setuptools. + self.pip_install('pip>=1.3') + self.pip_install('setuptools') - # Install greenlet by hand - just listing it in the requires file does - # not - # get it installed in the right order - self.pip_install('greenlet') - - self.pip_install('-r', self.pip_requires) - self.pip_install('-r', self.test_requires) + self.pip_install('-r', self.requirements) + self.pip_install('-r', self.test_requirements) def post_process(self): self.get_distro().post_process() diff --git a/tox.ini b/tox.ini index 5a65dace..5845bf98 100644 --- a/tox.ini +++ b/tox.ini @@ -8,8 +8,8 @@ setenv = VIRTUAL_ENV={envdir} NOSE_OPENSTACK_RED=0.05 NOSE_OPENSTACK_YELLOW=0.025 NOSE_OPENSTACK_SHOW_ELAPSED=1 -deps = -r{toxinidir}/tools/pip-requires - -r{toxinidir}/tools/test-requires +deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/test-requirements.txt commands = nosetests [testenv:pep8]