From f3ce2698dc5a02759d4dfd4f00e7ddf4497fb701 Mon Sep 17 00:00:00 2001 From: Kiall Mac Innes Date: Sun, 30 Jun 2013 23:37:19 +0100 Subject: [PATCH] Sync with oslo stable/grizzly c7862b5239822d701b7fb155faa4607eff602627 Change-Id: Ib7f5dd1404bdac1969e947cc310573052760f3f4 --- bin/designate-agent | 2 +- bin/designate-api | 2 +- bin/designate-central | 2 +- bin/designate-rootwrap | 19 +- bin/designate-rpc-zmq-receiver | 53 ++++ bin/designate-sink | 2 +- designate/__init__.py | 2 +- designate/agent/__init__.py | 2 +- designate/agent/rpcapi.py | 2 +- designate/agent/service.py | 6 +- designate/api/__init__.py | 2 +- designate/api/middleware.py | 4 +- designate/api/service.py | 2 +- designate/api/v1/__init__.py | 2 +- designate/backend/impl_bind9.py | 2 +- designate/backend/impl_dnsmasq.py | 2 +- designate/backend/impl_mysqlbind9.py | 2 +- designate/backend/impl_powerdns/__init__.py | 2 +- designate/central/__init__.py | 2 +- designate/central/rpcapi.py | 2 +- designate/central/service.py | 6 +- designate/context.py | 4 +- designate/manage/database.py | 2 +- designate/manage/powerdns.py | 2 +- designate/notification_handler/base.py | 2 +- designate/notification_handler/nova.py | 2 +- designate/notification_handler/quantum.py | 2 +- designate/openstack/common/cfg.py | 2 +- designate/openstack/common/context.py | 8 +- .../openstack/common/eventlet_backdoor.py | 13 +- designate/openstack/common/exception.py | 13 +- designate/openstack/common/excutils.py | 2 +- designate/openstack/common/importutils.py | 2 +- designate/openstack/common/jsonutils.py | 37 +-- designate/openstack/common/local.py | 13 +- designate/openstack/common/log.py | 23 +- designate/openstack/common/loopingcall.py | 64 +--- designate/openstack/common/network_utils.py | 2 +- .../openstack/common/notifier/__init__.py | 2 +- designate/openstack/common/notifier/api.py | 5 +- .../openstack/common/notifier/log_notifier.py | 4 +- .../common/notifier/no_op_notifier.py | 2 +- .../openstack/common/notifier/rpc_notifier.py | 4 +- .../common/notifier/rpc_notifier2.py | 5 +- .../common/notifier/test_notifier.py | 2 +- designate/openstack/common/periodic_task.py | 124 ++------ designate/openstack/common/policy.py | 2 +- designate/openstack/common/processutils.py | 72 +---- .../openstack/common/rootwrap/__init__.py | 2 +- .../openstack/common/rootwrap/filters.py | 2 +- .../openstack/common/rootwrap/wrapper.py | 2 +- designate/openstack/common/rpc/__init__.py | 63 +++- designate/openstack/common/rpc/amqp.py | 281 ++++++++++++++++-- designate/openstack/common/rpc/common.py | 50 ++-- designate/openstack/common/rpc/impl_fake.py | 5 +- designate/openstack/common/rpc/impl_kombu.py | 59 ++-- designate/openstack/common/rpc/impl_qpid.py | 95 ++++-- designate/openstack/common/rpc/impl_zmq.py | 242 ++++++++------- designate/openstack/common/rpc/matchmaker.py | 189 +++++++++++- .../openstack/common/rpc/matchmaker_redis.py | 149 ++++++++++ designate/openstack/common/rpc/service.py | 2 +- designate/openstack/common/service.py | 2 +- designate/openstack/common/sslutils.py | 80 +++++ designate/openstack/common/threadgroup.py | 2 +- designate/openstack/common/timeutils.py | 20 +- designate/openstack/common/wsgi.py | 86 +++++- designate/openstack/common/xmlutils.py | 74 +++++ designate/policy.py | 2 +- designate/quota/__init__.py | 2 +- designate/quota/base.py | 2 +- designate/sink/__init__.py | 2 +- designate/sink/service.py | 14 +- designate/sqlalchemy/session.py | 2 +- designate/storage/__init__.py | 2 +- designate/storage/impl_sqlalchemy/__init__.py | 2 +- designate/tests/__init__.py | 2 +- designate/tests/test_api/test_middleware.py | 6 +- designate/tests/test_backend/__init__.py | 2 +- designate/tests/test_quota/__init__.py | 2 +- designate/tests/test_sink/__init__.py | 20 -- designate/tests/test_sink/test_service.py | 33 -- designate/utils.py | 2 +- openstack-common.conf | 2 + requirements.txt | 1 + 84 files changed, 1409 insertions(+), 629 deletions(-) create mode 100755 bin/designate-rpc-zmq-receiver create mode 100644 designate/openstack/common/rpc/matchmaker_redis.py create mode 100644 designate/openstack/common/sslutils.py create mode 100644 designate/openstack/common/xmlutils.py delete mode 100644 designate/tests/test_sink/__init__.py delete mode 100644 designate/tests/test_sink/test_service.py diff --git a/bin/designate-agent b/bin/designate-agent index 82c0290b..aecdab1b 100755 --- a/bin/designate-agent +++ b/bin/designate-agent @@ -16,7 +16,7 @@ # under the License. import sys import eventlet -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common import service from designate import utils diff --git a/bin/designate-api b/bin/designate-api index 664dd2d9..06c29c85 100755 --- a/bin/designate-api +++ b/bin/designate-api @@ -16,7 +16,7 @@ # under the License. import sys import eventlet -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common import service from designate import utils diff --git a/bin/designate-central b/bin/designate-central index c6febbec..c63122bb 100755 --- a/bin/designate-central +++ b/bin/designate-central @@ -16,7 +16,7 @@ # under the License. import sys import eventlet -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common import service from designate import utils diff --git a/bin/designate-rootwrap b/bin/designate-rootwrap index a37ce6f2..ae8ed83c 100755 --- a/bin/designate-rootwrap +++ b/bin/designate-rootwrap @@ -1,7 +1,7 @@ #!/usr/bin/env python # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2011 OpenStack, LLC. +# Copyright (c) 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -20,14 +20,17 @@ Filters which commands a service is allowed to run as another user. - To use this with designate, you should set the following in designate.conf: + To use this with designate, you should set the following in + designate.conf: rootwrap_config=/etc/designate/rootwrap.conf - You also need to let the designate user run designate-rootwrap as root in sudoers: - designate ALL = (root) NOPASSWD: /usr/bin/designate-rootwrap /etc/designate/rootwrap.conf * + You also need to let the designate user run designate-rootwrap + as root in sudoers: + designate ALL = (root) NOPASSWD: /usr/bin/designate-rootwrap + /etc/designate/rootwrap.conf * - Service packaging should deploy .filters files only on nodes where they are - needed, to avoid allowing more than is necessary. + Service packaging should deploy .filters files only on nodes where + they are needed, to avoid allowing more than is necessary. """ import ConfigParser @@ -102,8 +105,8 @@ if __name__ == '__main__': exec_dirs=config.exec_dirs) if config.use_syslog: logging.info("(%s > %s) Executing %s (filter match = %s)" % ( - os.getlogin(), pwd.getpwuid(os.getuid())[0], - command, filtermatch.name)) + os.getlogin(), pwd.getpwuid(os.getuid())[0], + command, filtermatch.name)) obj = subprocess.Popen(command, stdin=sys.stdin, diff --git a/bin/designate-rpc-zmq-receiver b/bin/designate-rpc-zmq-receiver new file mode 100755 index 00000000..f2de0b61 --- /dev/null +++ b/bin/designate-rpc-zmq-receiver @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch() + +import contextlib +import os +import sys + +# If ../designate/__init__.py exists, add ../ to Python search path, so that +# it will override what happens to be installed in /usr/(local/)lib/python... +POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, + os.pardir)) +if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'designate', '__init__.py')): + sys.path.insert(0, POSSIBLE_TOPDIR) + +from oslo.config import cfg + +from designate.openstack.common import log as logging +from designate.openstack.common import rpc +from designate.openstack.common.rpc import impl_zmq + +CONF = cfg.CONF +CONF.register_opts(rpc.rpc_opts) +CONF.register_opts(impl_zmq.zmq_opts) + + +def main(): + CONF(sys.argv[1:], project='designate') + logging.setup("designate") + + with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: + reactor.consume_in_thread() + reactor.wait() + +if __name__ == '__main__': + main() diff --git a/bin/designate-sink b/bin/designate-sink index 24ac0a7c..468023a6 100755 --- a/bin/designate-sink +++ b/bin/designate-sink @@ -16,7 +16,7 @@ # under the License. import sys import eventlet -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common import service from designate import utils diff --git a/designate/__init__.py b/designate/__init__.py index 18ac9844..5deb5535 100644 --- a/designate/__init__.py +++ b/designate/__init__.py @@ -15,7 +15,7 @@ # under the License. import os import socket -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import rpc cfg.CONF.register_opts([ diff --git a/designate/agent/__init__.py b/designate/agent/__init__.py index 38f5894c..e27c8caf 100644 --- a/designate/agent/__init__.py +++ b/designate/agent/__init__.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg cfg.CONF.register_group(cfg.OptGroup( name='service:agent', title="Configuration for Agent Service" diff --git a/designate/agent/rpcapi.py b/designate/agent/rpcapi.py index 3bae741a..e860f57f 100644 --- a/designate/agent/rpcapi.py +++ b/designate/agent/rpcapi.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common.rpc import proxy as rpc_proxy diff --git a/designate/agent/service.py b/designate/agent/service.py index e6e80911..b9acf2fb 100644 --- a/designate/agent/service.py +++ b/designate/agent/service.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common.rpc import service as rpc_service from designate import backend @@ -40,6 +40,10 @@ class Service(rpc_service.Service): self.manager.start() super(Service, self).start() + def wait(self): + super(Service, self).wait() + self.conn.consumer_thread.wait() + def stop(self): super(Service, self).stop() self.manager.stop() diff --git a/designate/api/__init__.py b/designate/api/__init__.py index 754a0eb1..26aab356 100644 --- a/designate/api/__init__.py +++ b/designate/api/__init__.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. import flask -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import jsonutils as json cfg.CONF.register_group(cfg.OptGroup( diff --git a/designate/api/middleware.py b/designate/api/middleware.py index ef17e5bc..8f3974a8 100644 --- a/designate/api/middleware.py +++ b/designate/api/middleware.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. import flask -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import local from designate.openstack.common import log as logging from designate.openstack.common import uuidutils @@ -76,7 +76,7 @@ class KeystoneContextMiddleware(wsgi.Middleware): roles = headers.get('X-Roles').split(',') - context = DesignateContext(auth_tok=headers.get('X-Auth-Token'), + context = DesignateContext(auth_token=headers.get('X-Auth-Token'), user=headers.get('X-User-ID'), tenant=headers.get('X-Tenant-ID'), roles=roles) diff --git a/designate/api/service.py b/designate/api/service.py index daa61c60..c7e9c6b4 100644 --- a/designate/api/service.py +++ b/designate/api/service.py @@ -16,7 +16,7 @@ from paste import deploy from designate.openstack.common import log as logging from designate.openstack.common import wsgi -from designate.openstack.common import cfg +from oslo.config import cfg from designate import exceptions from designate import utils from designate import policy diff --git a/designate/api/v1/__init__.py b/designate/api/v1/__init__.py index 3888cf09..faac7174 100644 --- a/designate/api/v1/__init__.py +++ b/designate/api/v1/__init__.py @@ -21,7 +21,7 @@ from werkzeug import exceptions as wexceptions from werkzeug import wrappers from werkzeug.routing import BaseConverter from werkzeug.routing import ValidationError -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import jsonutils as json from designate.openstack.common import log as logging from designate.openstack.common import uuidutils diff --git a/designate/backend/impl_bind9.py b/designate/backend/impl_bind9.py index e8b7a4ac..fa28b32a 100644 --- a/designate/backend/impl_bind9.py +++ b/designate/backend/impl_bind9.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. import os -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate import utils from designate.backend import base diff --git a/designate/backend/impl_dnsmasq.py b/designate/backend/impl_dnsmasq.py index 08f58d63..026ad9d8 100644 --- a/designate/backend/impl_dnsmasq.py +++ b/designate/backend/impl_dnsmasq.py @@ -16,7 +16,7 @@ import os import glob import shutil -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate import utils from designate.backend import base diff --git a/designate/backend/impl_mysqlbind9.py b/designate/backend/impl_mysqlbind9.py index 96a1531c..c9b5ac5e 100644 --- a/designate/backend/impl_mysqlbind9.py +++ b/designate/backend/impl_mysqlbind9.py @@ -16,7 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. import os -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate import utils from designate.backend import base diff --git a/designate/backend/impl_powerdns/__init__.py b/designate/backend/impl_powerdns/__init__.py index e146701f..c645b2cd 100644 --- a/designate/backend/impl_powerdns/__init__.py +++ b/designate/backend/impl_powerdns/__init__.py @@ -19,7 +19,7 @@ import base64 from sqlalchemy.sql import select from sqlalchemy.sql.expression import null from sqlalchemy.orm import exc as sqlalchemy_exceptions -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate import exceptions from designate.backend import base diff --git a/designate/central/__init__.py b/designate/central/__init__.py index 1f8c58e3..88bd1883 100644 --- a/designate/central/__init__.py +++ b/designate/central/__init__.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg # NOTE(kiall): See http://data.iana.org/TLD/tlds-alpha-by-domain.txt # Version 2013031800. diff --git a/designate/central/rpcapi.py b/designate/central/rpcapi.py index d2f70c0a..9d842173 100644 --- a/designate/central/rpcapi.py +++ b/designate/central/rpcapi.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common.rpc import proxy as rpc_proxy diff --git a/designate/central/service.py b/designate/central/service.py index abaaf2f9..77e1afb1 100644 --- a/designate/central/service.py +++ b/designate/central/service.py @@ -15,7 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. import re -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common.rpc import service as rpc_service from designate import exceptions @@ -56,6 +56,10 @@ class Service(rpc_service.Service): super(Service, self).start() + def wait(self): + super(Service, self).wait() + self.conn.consumer_thread.wait() + def stop(self): super(Service, self).stop() diff --git a/designate/context.py b/designate/context.py index c6c147bc..968e72ed 100644 --- a/designate/context.py +++ b/designate/context.py @@ -22,11 +22,11 @@ LOG = logging.getLogger(__name__) class DesignateContext(context.RequestContext): - def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False, + def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False, read_only=False, show_deleted=False, request_id=None, roles=[]): super(DesignateContext, self).__init__( - auth_tok=auth_tok, + auth_token=auth_token, user=user, tenant=tenant, is_admin=is_admin, diff --git a/designate/manage/database.py b/designate/manage/database.py index d8dedfa1..cfeec053 100644 --- a/designate/manage/database.py +++ b/designate/manage/database.py @@ -18,7 +18,7 @@ from migrate.exceptions import (DatabaseAlreadyControlledError, DatabaseNotControlledError) from migrate.versioning import api as versioning_api from designate.openstack.common import log as logging -from designate.openstack.common import cfg +from oslo.config import cfg from designate.manage import base LOG = logging.getLogger(__name__) diff --git a/designate/manage/powerdns.py b/designate/manage/powerdns.py index f631b65a..3811fe05 100644 --- a/designate/manage/powerdns.py +++ b/designate/manage/powerdns.py @@ -17,7 +17,7 @@ import os from migrate.exceptions import DatabaseAlreadyControlledError from migrate.versioning import api as versioning_api from designate.openstack.common import log as logging -from designate.openstack.common import cfg +from oslo.config import cfg from designate.manage import base LOG = logging.getLogger(__name__) diff --git a/designate/notification_handler/base.py b/designate/notification_handler/base.py index 8c359a54..2b1b6d3d 100644 --- a/designate/notification_handler/base.py +++ b/designate/notification_handler/base.py @@ -15,7 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. import abc -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.central import rpcapi as central_rpcapi from designate.context import DesignateContext diff --git a/designate/notification_handler/nova.py b/designate/notification_handler/nova.py index d655197b..94fb5b2d 100644 --- a/designate/notification_handler/nova.py +++ b/designate/notification_handler/nova.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.notification_handler.base import BaseAddressHandler diff --git a/designate/notification_handler/quantum.py b/designate/notification_handler/quantum.py index f97eaf0f..f0c76877 100644 --- a/designate/notification_handler/quantum.py +++ b/designate/notification_handler/quantum.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.notification_handler.base import BaseAddressHandler diff --git a/designate/openstack/common/cfg.py b/designate/openstack/common/cfg.py index 03495204..fd259c4f 100644 --- a/designate/openstack/common/cfg.py +++ b/designate/openstack/common/cfg.py @@ -220,7 +220,7 @@ log files:: This module also contains a global instance of the ConfigOpts class in order to support a common usage pattern in OpenStack:: - from designate.openstack.common import cfg + from oslo.config import cfg opts = [ cfg.StrOpt('bind_host', default='0.0.0.0'), diff --git a/designate/openstack/common/context.py b/designate/openstack/common/context.py index dd7dd04c..e9cfd73c 100644 --- a/designate/openstack/common/context.py +++ b/designate/openstack/common/context.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -37,9 +37,9 @@ class RequestContext(object): accesses the system, as well as additional request information. """ - def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False, + def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False, read_only=False, show_deleted=False, request_id=None): - self.auth_tok = auth_tok + self.auth_token = auth_token self.user = user self.tenant = tenant self.is_admin = is_admin @@ -55,7 +55,7 @@ class RequestContext(object): 'is_admin': self.is_admin, 'read_only': self.read_only, 'show_deleted': self.show_deleted, - 'auth_token': self.auth_tok, + 'auth_token': self.auth_token, 'request_id': self.request_id} diff --git a/designate/openstack/common/eventlet_backdoor.py b/designate/openstack/common/eventlet_backdoor.py index 239d0098..c0ad460f 100644 --- a/designate/openstack/common/eventlet_backdoor.py +++ b/designate/openstack/common/eventlet_backdoor.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2012 Openstack, LLC. +# Copyright (c) 2012 OpenStack Foundation. # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # @@ -24,8 +24,7 @@ import traceback import eventlet import eventlet.backdoor import greenlet - -from designate.openstack.common import cfg +from oslo.config import cfg eventlet_backdoor_opts = [ cfg.IntOpt('backdoor_port', @@ -52,12 +51,20 @@ def _print_greenthreads(): print +def _print_nativethreads(): + for threadId, stack in sys._current_frames().items(): + print threadId + traceback.print_stack(stack) + print + + def initialize_if_enabled(): backdoor_locals = { 'exit': _dont_use_this, # So we don't exit the entire process 'quit': _dont_use_this, # So we don't exit the entire process 'fo': _find_objects, 'pgt': _print_greenthreads, + 'pnt': _print_nativethreads, } if CONF.backdoor_port is None: diff --git a/designate/openstack/common/exception.py b/designate/openstack/common/exception.py index e6f2c33e..b240b9b7 100644 --- a/designate/openstack/common/exception.py +++ b/designate/openstack/common/exception.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -23,6 +23,8 @@ import logging from designate.openstack.common.gettextutils import _ +_FATAL_EXCEPTION_FORMAT_ERRORS = False + class Error(Exception): def __init__(self, message=None): @@ -121,9 +123,12 @@ class OpenstackException(Exception): try: self._error_string = self.message % kwargs - except Exception: - # at least get the core message out if something happened - self._error_string = self.message + except Exception as e: + if _FATAL_EXCEPTION_FORMAT_ERRORS: + raise e + else: + # at least get the core message out if something happened + self._error_string = self.message def __str__(self): return self._error_string diff --git a/designate/openstack/common/excutils.py b/designate/openstack/common/excutils.py index a936246b..329b0c72 100644 --- a/designate/openstack/common/excutils.py +++ b/designate/openstack/common/excutils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # Copyright 2012, Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/importutils.py b/designate/openstack/common/importutils.py index 9dec764f..3bd277f4 100644 --- a/designate/openstack/common/importutils.py +++ b/designate/openstack/common/importutils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/jsonutils.py b/designate/openstack/common/jsonutils.py index 701f85c8..fce4fd30 100644 --- a/designate/openstack/common/jsonutils.py +++ b/designate/openstack/common/jsonutils.py @@ -34,6 +34,7 @@ This module provides a few things: import datetime +import functools import inspect import itertools import json @@ -42,7 +43,8 @@ import xmlrpclib from designate.openstack.common import timeutils -def to_primitive(value, convert_instances=False, level=0): +def to_primitive(value, convert_instances=False, convert_datetime=True, + level=0, max_depth=3): """Convert a complex object into primitives. Handy for JSON serialization. We can optionally handle instances, @@ -78,12 +80,17 @@ def to_primitive(value, convert_instances=False, level=0): if getattr(value, '__module__', None) == 'mox': return 'mock' - if level > 3: + if level > max_depth: return '?' # The try block may not be necessary after the class check above, # but just in case ... try: + recursive = functools.partial(to_primitive, + convert_instances=convert_instances, + convert_datetime=convert_datetime, + level=level, + max_depth=max_depth) # It's not clear why xmlrpclib created their own DateTime type, but # for our purposes, make it a datetime type which is explicitly # handled @@ -91,33 +98,19 @@ def to_primitive(value, convert_instances=False, level=0): value = datetime.datetime(*tuple(value.timetuple())[:6]) if isinstance(value, (list, tuple)): - o = [] - for v in value: - o.append(to_primitive(v, convert_instances=convert_instances, - level=level)) - return o + return [recursive(v) for v in value] elif isinstance(value, dict): - o = {} - for k, v in value.iteritems(): - o[k] = to_primitive(v, convert_instances=convert_instances, - level=level) - return o - elif isinstance(value, datetime.datetime): + return dict((k, recursive(v)) for k, v in value.iteritems()) + elif convert_datetime and isinstance(value, datetime.datetime): return timeutils.strtime(value) elif hasattr(value, 'iteritems'): - return to_primitive(dict(value.iteritems()), - convert_instances=convert_instances, - level=level + 1) + return recursive(dict(value.iteritems()), level=level + 1) elif hasattr(value, '__iter__'): - return to_primitive(list(value), - convert_instances=convert_instances, - level=level) + return recursive(list(value)) elif convert_instances and hasattr(value, '__dict__'): # Likely an instance of something. Watch for cycles. # Ignore class member vars. - return to_primitive(value.__dict__, - convert_instances=convert_instances, - level=level + 1) + return recursive(value.__dict__, level=level + 1) else: return value except TypeError: diff --git a/designate/openstack/common/local.py b/designate/openstack/common/local.py index 19d96273..f1bfc824 100644 --- a/designate/openstack/common/local.py +++ b/designate/openstack/common/local.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -26,6 +26,9 @@ class WeakLocal(corolocal.local): def __getattribute__(self, attr): rval = corolocal.local.__getattribute__(self, attr) if rval: + # NOTE(mikal): this bit is confusing. What is stored is a weak + # reference, not the value itself. We therefore need to lookup + # the weak reference and return the inner value here. rval = rval() return rval @@ -34,4 +37,12 @@ class WeakLocal(corolocal.local): return corolocal.local.__setattr__(self, attr, value) +# NOTE(mikal): the name "store" should be deprecated in the future store = WeakLocal() + +# A "weak" store uses weak references and allows an object to fall out of scope +# when it falls out of scope in the code that uses the thread local storage. A +# "strong" store will hold a reference to the object so that it never falls out +# of scope. +weak_store = WeakLocal() +strong_store = corolocal.local diff --git a/designate/openstack/common/log.py b/designate/openstack/common/log.py index 07c6be32..f3df5773 100644 --- a/designate/openstack/common/log.py +++ b/designate/openstack/common/log.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -40,7 +40,8 @@ import stat import sys import traceback -from designate.openstack.common import cfg +from oslo.config import cfg + from designate.openstack.common.gettextutils import _ from designate.openstack.common import jsonutils from designate.openstack.common import local @@ -324,16 +325,11 @@ def _create_logging_excepthook(product_name): def setup(product_name): """Setup logging.""" - sys.excepthook = _create_logging_excepthook(product_name) - if CONF.log_config: - try: - logging.config.fileConfig(CONF.log_config) - except Exception: - traceback.print_exc() - raise + logging.config.fileConfig(CONF.log_config) else: - _setup_logging_from_conf(product_name) + _setup_logging_from_conf() + sys.excepthook = _create_logging_excepthook(product_name) def set_defaults(logging_context_format_string): @@ -366,8 +362,8 @@ def _find_facility_from_conf(): return facility -def _setup_logging_from_conf(product_name): - log_root = getLogger(product_name).logger +def _setup_logging_from_conf(): + log_root = getLogger(None).logger for handler in log_root.handlers: log_root.removeHandler(handler) @@ -405,7 +401,8 @@ def _setup_logging_from_conf(product_name): if CONF.log_format: handler.setFormatter(logging.Formatter(fmt=CONF.log_format, datefmt=datefmt)) - handler.setFormatter(LegacyFormatter(datefmt=datefmt)) + else: + handler.setFormatter(LegacyFormatter(datefmt=datefmt)) if CONF.debug: log_root.setLevel(logging.DEBUG) diff --git a/designate/openstack/common/loopingcall.py b/designate/openstack/common/loopingcall.py index 54f18e50..bbac8bb2 100644 --- a/designate/openstack/common/loopingcall.py +++ b/designate/openstack/common/loopingcall.py @@ -46,23 +46,12 @@ class LoopingCallDone(Exception): self.retvalue = retvalue -class LoopingCallBase(object): +class LoopingCall(object): def __init__(self, f=None, *args, **kw): self.args = args self.kw = kw self.f = f self._running = False - self.done = None - - def stop(self): - self._running = False - - def wait(self): - return self.done.wait() - - -class FixedIntervalLoopingCall(LoopingCallBase): - """A fixed interval looping call.""" def start(self, interval, initial_delay=None): self._running = True @@ -88,7 +77,7 @@ class FixedIntervalLoopingCall(LoopingCallBase): self.stop() done.send(e.retvalue) except Exception: - LOG.exception(_('in fixed duration looping call')) + LOG.exception(_('in looping call')) done.send_exception(*sys.exc_info()) return else: @@ -99,49 +88,8 @@ class FixedIntervalLoopingCall(LoopingCallBase): greenthread.spawn_n(_inner) return self.done + def stop(self): + self._running = False -# TODO(mikal): this class name is deprecated in Havana and should be removed -# in the I release -LoopingCall = FixedIntervalLoopingCall - - -class DynamicLoopingCall(LoopingCallBase): - """A looping call which sleeps until the next known event. - - The function called should return how long to sleep for before being - called again. - """ - - def start(self, initial_delay=None, periodic_interval_max=None): - self._running = True - done = event.Event() - - def _inner(): - if initial_delay: - greenthread.sleep(initial_delay) - - try: - while self._running: - idle = self.f(*self.args, **self.kw) - if not self._running: - break - - if periodic_interval_max is not None: - idle = min(idle, periodic_interval_max) - LOG.debug(_('Dynamic looping call sleeping for %.02f ' - 'seconds'), idle) - greenthread.sleep(idle) - except LoopingCallDone, e: - self.stop() - done.send(e.retvalue) - except Exception: - LOG.exception(_('in dynamic looping call')) - done.send_exception(*sys.exc_info()) - return - else: - done.send(True) - - self.done = done - - greenthread.spawn(_inner) - return self.done + def wait(self): + return self.done.wait() diff --git a/designate/openstack/common/network_utils.py b/designate/openstack/common/network_utils.py index 69f67321..5224e01a 100644 --- a/designate/openstack/common/network_utils.py +++ b/designate/openstack/common/network_utils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2012 OpenStack LLC. +# Copyright 2012 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/notifier/__init__.py b/designate/openstack/common/notifier/__init__.py index 482d54e4..45c3b46a 100644 --- a/designate/openstack/common/notifier/__init__.py +++ b/designate/openstack/common/notifier/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/notifier/api.py b/designate/openstack/common/notifier/api.py index 55dc6016..bd4825dc 100644 --- a/designate/openstack/common/notifier/api.py +++ b/designate/openstack/common/notifier/api.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -15,7 +15,8 @@ import uuid -from designate.openstack.common import cfg +from oslo.config import cfg + from designate.openstack.common import context from designate.openstack.common.gettextutils import _ from designate.openstack.common import importutils diff --git a/designate/openstack/common/notifier/log_notifier.py b/designate/openstack/common/notifier/log_notifier.py index 4439f2a6..975d3fe1 100644 --- a/designate/openstack/common/notifier/log_notifier.py +++ b/designate/openstack/common/notifier/log_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -13,8 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg -from designate.openstack.common import cfg from designate.openstack.common import jsonutils from designate.openstack.common import log as logging diff --git a/designate/openstack/common/notifier/no_op_notifier.py b/designate/openstack/common/notifier/no_op_notifier.py index ee1ddbdc..bc7a56ca 100644 --- a/designate/openstack/common/notifier/no_op_notifier.py +++ b/designate/openstack/common/notifier/no_op_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/notifier/rpc_notifier.py b/designate/openstack/common/notifier/rpc_notifier.py index 22f37b35..2c85f038 100644 --- a/designate/openstack/common/notifier/rpc_notifier.py +++ b/designate/openstack/common/notifier/rpc_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -13,8 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg -from designate.openstack.common import cfg from designate.openstack.common import context as req_context from designate.openstack.common.gettextutils import _ from designate.openstack.common import log as logging diff --git a/designate/openstack/common/notifier/rpc_notifier2.py b/designate/openstack/common/notifier/rpc_notifier2.py index 90d1d0d7..e31c0a6c 100644 --- a/designate/openstack/common/notifier/rpc_notifier2.py +++ b/designate/openstack/common/notifier/rpc_notifier2.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -15,7 +15,8 @@ '''messaging based notification driver, with message envelopes''' -from designate.openstack.common import cfg +from oslo.config import cfg + from designate.openstack.common import context as req_context from designate.openstack.common.gettextutils import _ from designate.openstack.common import log as logging diff --git a/designate/openstack/common/notifier/test_notifier.py b/designate/openstack/common/notifier/test_notifier.py index 5e348803..96c1746b 100644 --- a/designate/openstack/common/notifier/test_notifier.py +++ b/designate/openstack/common/notifier/test_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/periodic_task.py b/designate/openstack/common/periodic_task.py index d7c112ec..eaa6a486 100644 --- a/designate/openstack/common/periodic_task.py +++ b/designate/openstack/common/periodic_task.py @@ -13,72 +13,26 @@ # License for the specific language governing permissions and limitations # under the License. -import datetime -import time - from designate.openstack.common.gettextutils import _ -from designate.openstack.common import cfg from designate.openstack.common import log as logging -from designate.openstack.common import timeutils - - -periodic_opts = [ - cfg.BoolOpt('run_external_periodic_tasks', - default=True, - help=('Some periodic tasks can be run in a separate process. ' - 'Should we run them here?')), -] - -CONF = cfg.CONF -CONF.register_opts(periodic_opts) LOG = logging.getLogger(__name__) -DEFAULT_INTERVAL = 60.0 - - -class InvalidPeriodicTaskArg(Exception): - message = _("Unexpected argument for periodic task creation: %(arg)s.") - def periodic_task(*args, **kwargs): """Decorator to indicate that a method is a periodic task. This decorator can be used in two ways: - 1. Without arguments '@periodic_task', this will be run on every cycle + 1. Without arguments '@periodic_task', this will be run on every tick of the periodic scheduler. - 2. With arguments: - @periodic_task(spacing=N [, run_immediately=[True|False]]) - this will be run on approximately every N seconds. If this number is - negative the periodic task will be disabled. If the run_immediately - argument is provided and has a value of 'True', the first run of the - task will be shortly after task scheduler starts. If - run_immediately is omitted or set to 'False', the first time the - task runs will be approximately N seconds after the task scheduler - starts. + 2. With arguments, @periodic_task(ticks_between_runs=N), this will be + run on every N ticks of the periodic scheduler. """ def decorator(f): - # Test for old style invocation - if 'ticks_between_runs' in kwargs: - raise InvalidPeriodicTaskArg(arg='ticks_between_runs') - - # Control if run at all f._periodic_task = True - f._periodic_external_ok = kwargs.pop('external_process_ok', False) - if f._periodic_external_ok and not CONF.run_external_periodic_tasks: - f._periodic_enabled = False - else: - f._periodic_enabled = kwargs.pop('enabled', True) - - # Control frequency - f._periodic_spacing = kwargs.pop('spacing', 0) - f._periodic_immediate = kwargs.pop('run_immediately', False) - if f._periodic_immediate: - f._periodic_last_run = None - else: - f._periodic_last_run = timeutils.utcnow() + f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0) return f # NOTE(sirp): The `if` is necessary to allow the decorator to be used with @@ -105,7 +59,7 @@ class _PeriodicTasksMeta(type): super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) # NOTE(sirp): if the attribute is not present then we must be the base - # class, so, go ahead an initialize it. If the attribute is present, + # class, so, go ahead and initialize it. If the attribute is present, # then we're a subclass so make a copy of it so we don't step on our # parent's toes. try: @@ -114,39 +68,20 @@ class _PeriodicTasksMeta(type): cls._periodic_tasks = [] try: - cls._periodic_last_run = cls._periodic_last_run.copy() + cls._ticks_to_skip = cls._ticks_to_skip.copy() except AttributeError: - cls._periodic_last_run = {} - - try: - cls._periodic_spacing = cls._periodic_spacing.copy() - except AttributeError: - cls._periodic_spacing = {} + cls._ticks_to_skip = {} + # This uses __dict__ instead of + # inspect.getmembers(cls, inspect.ismethod) so only the methods of the + # current class are added when this class is scanned, and base classes + # are not added redundantly. for value in cls.__dict__.values(): if getattr(value, '_periodic_task', False): task = value name = task.__name__ - - if task._periodic_spacing < 0: - LOG.info(_('Skipping periodic task %(task)s because ' - 'its interval is negative'), - {'task': name}) - continue - if not task._periodic_enabled: - LOG.info(_('Skipping periodic task %(task)s because ' - 'it is disabled'), - {'task': name}) - continue - - # A periodic spacing of zero indicates that this task should - # be run every pass - if task._periodic_spacing == 0: - task._periodic_spacing = None - cls._periodic_tasks.append((name, task)) - cls._periodic_spacing[name] = task._periodic_spacing - cls._periodic_last_run[name] = task._periodic_last_run + cls._ticks_to_skip[name] = task._ticks_between_runs class PeriodicTasks(object): @@ -154,34 +89,27 @@ class PeriodicTasks(object): def run_periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" - idle_for = DEFAULT_INTERVAL for task_name, task in self._periodic_tasks: full_task_name = '.'.join([self.__class__.__name__, task_name]) - now = timeutils.utcnow() - spacing = self._periodic_spacing[task_name] - last_run = self._periodic_last_run[task_name] + ticks_to_skip = self._ticks_to_skip[task_name] + if ticks_to_skip > 0: + LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s" + " ticks left until next run"), + dict(full_task_name=full_task_name, + ticks_to_skip=ticks_to_skip)) + self._ticks_to_skip[task_name] -= 1 + continue - # If a periodic task is _nearly_ due, then we'll run it early - if spacing is not None and last_run is not None: - due = last_run + datetime.timedelta(seconds=spacing) - if not timeutils.is_soon(due, 0.2): - idle_for = min(idle_for, timeutils.delta_seconds(now, due)) - continue - - if spacing is not None: - idle_for = min(idle_for, spacing) - - LOG.debug(_("Running periodic task %(full_task_name)s"), locals()) - self._periodic_last_run[task_name] = timeutils.utcnow() + self._ticks_to_skip[task_name] = task._ticks_between_runs + LOG.debug(_("Running periodic task %(full_task_name)s"), + dict(full_task_name=full_task_name)) try: task(self, context) except Exception as e: if raise_on_error: raise - LOG.exception(_("Error during %(full_task_name)s: %(e)s"), - locals()) - time.sleep(0) - - return idle_for + LOG.exception(_("Error during %(full_task_name)s:" + " %(e)s"), + dict(e=e, full_task_name=full_task_name)) diff --git a/designate/openstack/common/policy.py b/designate/openstack/common/policy.py index 0eff06bf..355186db 100644 --- a/designate/openstack/common/policy.py +++ b/designate/openstack/common/policy.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2012 OpenStack, LLC. +# Copyright (c) 2012 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/processutils.py b/designate/openstack/common/processutils.py index 44255d13..e1154f79 100644 --- a/designate/openstack/common/processutils.py +++ b/designate/openstack/common/processutils.py @@ -19,16 +19,14 @@ System-level utilities and helper functions. """ -import os +import logging import random import shlex -import signal from eventlet.green import subprocess from eventlet import greenthread from designate.openstack.common.gettextutils import _ -from designate.openstack.common import log as logging LOG = logging.getLogger(__name__) @@ -42,12 +40,6 @@ class UnknownArgumentError(Exception): class ProcessExecutionError(Exception): def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, description=None): - self.exit_code = exit_code - self.stderr = stderr - self.stdout = stdout - self.cmd = cmd - self.description = description - if description is None: description = "Unexpected error while running command." if exit_code is None: @@ -57,17 +49,6 @@ class ProcessExecutionError(Exception): super(ProcessExecutionError, self).__init__(message) -class NoRootWrapSpecified(Exception): - def __init__(self, message=None): - super(NoRootWrapSpecified, self).__init__(message) - - -def _subprocess_setup(): - # Python installs a SIGPIPE handler by default. This is usually not what - # non-Python subprocesses expect. - signal.signal(signal.SIGPIPE, signal.SIG_DFL) - - def execute(*cmd, **kwargs): """ Helper method to shell out and execute a command through subprocess with @@ -77,11 +58,11 @@ def execute(*cmd, **kwargs): :type cmd: string :param process_input: Send to opened process. :type proces_input: string - :param check_exit_code: Single bool, int, or list of allowed exit - codes. Defaults to [0]. Raise - :class:`ProcessExecutionError` unless - program exits with one of these code. - :type check_exit_code: boolean, int, or [int] + :param check_exit_code: Defaults to 0. Will raise + :class:`ProcessExecutionError` + if the command exits without returning this value + as a returncode + :type check_exit_code: int :param delay_on_retry: True | False. Defaults to True. If set to True, wait a short amount of time before retrying. :type delay_on_retry: boolean @@ -91,12 +72,8 @@ def execute(*cmd, **kwargs): the command is prefixed by the command specified in the root_helper kwarg. :type run_as_root: boolean - :param root_helper: command to prefix to commands called with - run_as_root=True + :param root_helper: command to prefix all cmd's with :type root_helper: string - :param shell: whether or not there should be a shell used to - execute this command. Defaults to false. - :type shell: boolean :returns: (stdout, stderr) from process execution :raises: :class:`UnknownArgumentError` on receiving unknown arguments @@ -104,31 +81,16 @@ def execute(*cmd, **kwargs): """ process_input = kwargs.pop('process_input', None) - check_exit_code = kwargs.pop('check_exit_code', [0]) - ignore_exit_code = False + check_exit_code = kwargs.pop('check_exit_code', 0) delay_on_retry = kwargs.pop('delay_on_retry', True) attempts = kwargs.pop('attempts', 1) run_as_root = kwargs.pop('run_as_root', False) root_helper = kwargs.pop('root_helper', '') - shell = kwargs.pop('shell', False) - - if isinstance(check_exit_code, bool): - ignore_exit_code = not check_exit_code - check_exit_code = [0] - elif isinstance(check_exit_code, int): - check_exit_code = [check_exit_code] - if len(kwargs): raise UnknownArgumentError(_('Got unknown keyword args ' 'to utils.execute: %r') % kwargs) - - if run_as_root and os.geteuid() != 0: - if not root_helper: - raise NoRootWrapSpecified( - message=('Command requested root, but did not specify a root ' - 'helper.')) + if run_as_root: cmd = shlex.split(root_helper) + list(cmd) - cmd = map(str, cmd) while attempts > 0: @@ -136,21 +98,11 @@ def execute(*cmd, **kwargs): try: LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd)) _PIPE = subprocess.PIPE # pylint: disable=E1101 - - if os.name == 'nt': - preexec_fn = None - close_fds = False - else: - preexec_fn = _subprocess_setup - close_fds = True - obj = subprocess.Popen(cmd, stdin=_PIPE, stdout=_PIPE, stderr=_PIPE, - close_fds=close_fds, - preexec_fn=preexec_fn, - shell=shell) + close_fds=True) result = None if process_input is not None: result = obj.communicate(process_input) @@ -160,7 +112,9 @@ def execute(*cmd, **kwargs): _returncode = obj.returncode # pylint: disable=E1101 if _returncode: LOG.debug(_('Result was %s') % _returncode) - if not ignore_exit_code and _returncode not in check_exit_code: + if (isinstance(check_exit_code, int) and + not isinstance(check_exit_code, bool) and + _returncode != check_exit_code): (stdout, stderr) = result raise ProcessExecutionError(exit_code=_returncode, stdout=stdout, diff --git a/designate/openstack/common/rootwrap/__init__.py b/designate/openstack/common/rootwrap/__init__.py index 671d3c17..2d32e4ef 100644 --- a/designate/openstack/common/rootwrap/__init__.py +++ b/designate/openstack/common/rootwrap/__init__.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2011 OpenStack, LLC. +# Copyright (c) 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/rootwrap/filters.py b/designate/openstack/common/rootwrap/filters.py index 905bbabe..eadda256 100644 --- a/designate/openstack/common/rootwrap/filters.py +++ b/designate/openstack/common/rootwrap/filters.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2011 OpenStack, LLC. +# Copyright (c) 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/rootwrap/wrapper.py b/designate/openstack/common/rootwrap/wrapper.py index 969b893a..2c95fc1f 100644 --- a/designate/openstack/common/rootwrap/wrapper.py +++ b/designate/openstack/common/rootwrap/wrapper.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2011 OpenStack, LLC. +# Copyright (c) 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/designate/openstack/common/rpc/__init__.py b/designate/openstack/common/rpc/__init__.py index 00938af6..d7275e33 100644 --- a/designate/openstack/common/rpc/__init__.py +++ b/designate/openstack/common/rpc/__init__.py @@ -25,8 +25,17 @@ For some wrappers that add message versioning to rpc, see: rpc.proxy """ -from designate.openstack.common import cfg +import inspect +import logging + +from oslo.config import cfg + +from designate.openstack.common.gettextutils import _ from designate.openstack.common import importutils +from designate.openstack.common import local + + +LOG = logging.getLogger(__name__) rpc_opts = [ @@ -62,7 +71,8 @@ rpc_opts = [ help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] -cfg.CONF.register_opts(rpc_opts) +CONF = cfg.CONF +CONF.register_opts(rpc_opts) def set_defaults(control_exchange): @@ -83,10 +93,27 @@ def create_connection(new=True): :returns: An instance of openstack.common.rpc.common.Connection """ - return _get_impl().create_connection(cfg.CONF, new=new) + return _get_impl().create_connection(CONF, new=new) -def call(context, topic, msg, timeout=None): +def _check_for_lock(): + if not CONF.debug: + return None + + if ((hasattr(local.strong_store, 'locks_held') + and local.strong_store.locks_held)): + stack = ' :: '.join([frame[3] for frame in inspect.stack()]) + LOG.warn(_('A RPC is being made while holding a lock. The locks ' + 'currently held are %(locks)s. This is probably a bug. ' + 'Please report it. Include the following: [%(stack)s].'), + {'locks': local.strong_store.locks_held, + 'stack': stack}) + return True + + return False + + +def call(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method that returns something. :param context: Information that identifies the user that has made this @@ -100,13 +127,17 @@ def call(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: A dict from the remote method. :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().call(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().call(CONF, context, topic, msg, timeout) def cast(context, topic, msg): @@ -124,7 +155,7 @@ def cast(context, topic, msg): :returns: None """ - return _get_impl().cast(cfg.CONF, context, topic, msg) + return _get_impl().cast(CONF, context, topic, msg) def fanout_cast(context, topic, msg): @@ -145,10 +176,10 @@ def fanout_cast(context, topic, msg): :returns: None """ - return _get_impl().fanout_cast(cfg.CONF, context, topic, msg) + return _get_impl().fanout_cast(CONF, context, topic, msg) -def multicall(context, topic, msg, timeout=None): +def multicall(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method and get back an iterator. In this case, the remote method will be returning multiple values in @@ -166,6 +197,8 @@ def multicall(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: An iterator. The iterator will yield a tuple (N, X) where N is an index that starts at 0 and increases by one for each value @@ -175,7 +208,9 @@ def multicall(context, topic, msg, timeout=None): :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().multicall(CONF, context, topic, msg, timeout) def notify(context, topic, msg, envelope=False): @@ -217,7 +252,7 @@ def cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic, + return _get_impl().cast_to_server(CONF, context, server_params, topic, msg) @@ -233,7 +268,7 @@ def fanout_cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params, + return _get_impl().fanout_cast_to_server(CONF, context, server_params, topic, msg) @@ -263,10 +298,10 @@ def _get_impl(): global _RPCIMPL if _RPCIMPL is None: try: - _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + _RPCIMPL = importutils.import_module(CONF.rpc_backend) except ImportError: # For backwards compatibility with older nova config. - impl = cfg.CONF.rpc_backend.replace('nova.rpc', - 'nova.openstack.common.rpc') + impl = CONF.rpc_backend.replace('nova.rpc', + 'nova.openstack.common.rpc') _RPCIMPL = importutils.import_module(impl) return _RPCIMPL diff --git a/designate/openstack/common/rpc/amqp.py b/designate/openstack/common/rpc/amqp.py index c52c7c25..071442f5 100644 --- a/designate/openstack/common/rpc/amqp.py +++ b/designate/openstack/common/rpc/amqp.py @@ -25,13 +25,19 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses AMQP, but is deprecated and predates this code. """ +import collections import inspect import sys import uuid from eventlet import greenpool from eventlet import pools +from eventlet import queue from eventlet import semaphore +# TODO(pekowsk): Remove import cfg and below comment in Havana. +# This import should no longer be needed when the amqp_rpc_single_reply_queue +# option is removed. +from oslo.config import cfg from designate.openstack.common import excutils from designate.openstack.common.gettextutils import _ @@ -40,6 +46,17 @@ from designate.openstack.common import log as logging from designate.openstack.common.rpc import common as rpc_common +# TODO(pekowski): Remove this option in Havana. +amqp_opts = [ + cfg.BoolOpt('amqp_rpc_single_reply_queue', + default=False, + help='Enable a fast single reply queue if using AMQP based ' + 'RPC like RabbitMQ or Qpid.'), +] + +cfg.CONF.register_opts(amqp_opts) + +UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) @@ -51,6 +68,7 @@ class Pool(pools.Pool): kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size) kwargs.setdefault("order_as_stack", True) super(Pool, self).__init__(*args, **kwargs) + self.reply_proxy = None # TODO(comstud): Timeout connections not used in a while def create(self): @@ -60,6 +78,16 @@ class Pool(pools.Pool): def empty(self): while self.free_items: self.get().close() + # Force a new connection pool to be created. + # Note that this was added due to failing unit test cases. The issue + # is the above "while loop" gets all the cached connections from the + # pool and closes them, but never returns them to the pool, a pool + # leak. The unit tests hang waiting for an item to be returned to the + # pool. The unit tests get here via the teatDown() method. In the run + # time code, it gets here via cleanup() and only appears in service.py + # just before doing a sys.exit(), so cleanup() only happens once and + # the leakage is not a problem. + self.connection_cls.pool = None _pool_create_sem = semaphore.Semaphore() @@ -137,12 +165,15 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + self.connection.join_consumer_pool(callback, + pool_name, + topic, + exchange_name) + def consume_in_thread(self): self.connection.consume_in_thread() - def consume_in_thread_group(self, thread_group): - self.connection.consume_in_thread_group(thread_group) - def __getattr__(self, key): """Proxy all other calls to the Connection instance""" if self.connection: @@ -151,8 +182,45 @@ class ConnectionContext(rpc_common.Connection): raise rpc_common.InvalidRPCConnectionReuse() -def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, - ending=False, log_failure=True): +class ReplyProxy(ConnectionContext): + """ Connection class for RPC replies / callbacks """ + def __init__(self, conf, connection_pool): + self._call_waiters = {} + self._num_call_waiters = 0 + self._num_call_waiters_wrn_threshhold = 10 + self._reply_q = 'reply_' + uuid.uuid4().hex + super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False) + self.declare_direct_consumer(self._reply_q, self._process_data) + self.consume_in_thread() + + def _process_data(self, message_data): + msg_id = message_data.pop('_msg_id', None) + waiter = self._call_waiters.get(msg_id) + if not waiter: + LOG.warn(_('no calling threads waiting for msg_id : %s' + ', message : %s') % (msg_id, message_data)) + else: + waiter.put(message_data) + + def add_call_waiter(self, waiter, msg_id): + self._num_call_waiters += 1 + if self._num_call_waiters > self._num_call_waiters_wrn_threshhold: + LOG.warn(_('Number of call waiters is greater than warning ' + 'threshhold: %d. There could be a MulticallProxyWaiter ' + 'leak.') % self._num_call_waiters_wrn_threshhold) + self._num_call_waiters_wrn_threshhold *= 2 + self._call_waiters[msg_id] = waiter + + def del_call_waiter(self, msg_id): + self._num_call_waiters -= 1 + del self._call_waiters[msg_id] + + def get_reply_q(self): + return self._reply_q + + +def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, + failure=None, ending=False, log_failure=True): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. @@ -171,13 +239,22 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) + _add_unique_id(msg) + # If a reply_q exists, add the msg_id to the reply and pass the + # reply_q to direct_send() to use it as the response queue. + # Otherwise use the msg_id for backward compatibilty. + if reply_q: + msg['_msg_id'] = msg_id + conn.direct_send(reply_q, rpc_common.serialize_msg(msg)) + else: + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): """Context that supports replying to a rpc.call""" def __init__(self, **kwargs): self.msg_id = kwargs.pop('msg_id', None) + self.reply_q = kwargs.pop('reply_q', None) self.conf = kwargs.pop('conf') super(RpcContext, self).__init__(**kwargs) @@ -185,13 +262,14 @@ class RpcContext(rpc_common.CommonRpcContext): values = self.to_dict() values['conf'] = self.conf values['msg_id'] = self.msg_id + values['reply_q'] = self.reply_q return self.__class__(**values) def reply(self, reply=None, failure=None, ending=False, connection_pool=None, log_failure=True): if self.msg_id: - msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, - ending, log_failure) + msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool, + reply, failure, ending, log_failure) if ending: self.msg_id = None @@ -207,6 +285,7 @@ def unpack_context(conf, msg): value = msg.pop(key) context_dict[key[9:]] = value context_dict['msg_id'] = msg.pop('_msg_id', None) + context_dict['reply_q'] = msg.pop('_reply_q', None) context_dict['conf'] = conf ctx = RpcContext.from_dict(context_dict) rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict()) @@ -227,15 +306,86 @@ def pack_context(msg, context): msg.update(context_d) -class ProxyCallback(object): - """Calls methods on a proxy object based on method and args.""" +class _MsgIdCache(object): + """This class checks any duplicate messages.""" - def __init__(self, conf, proxy, connection_pool): - self.proxy = proxy + # NOTE: This value is considered can be a configuration item, but + # it is not necessary to change its value in most cases, + # so let this value as static for now. + DUP_MSG_CHECK_SIZE = 16 + + def __init__(self, **kwargs): + self.prev_msgids = collections.deque([], + maxlen=self.DUP_MSG_CHECK_SIZE) + + def check_duplicate_message(self, message_data): + """AMQP consumers may read same message twice when exceptions occur + before ack is returned. This method prevents doing it. + """ + if UNIQUE_ID in message_data: + msg_id = message_data[UNIQUE_ID] + if msg_id not in self.prev_msgids: + self.prev_msgids.append(msg_id) + else: + raise rpc_common.DuplicateMessageError(msg_id=msg_id) + + +def _add_unique_id(msg): + """Add unique_id for checking duplicate messages.""" + unique_id = uuid.uuid4().hex + msg.update({UNIQUE_ID: unique_id}) + LOG.debug(_('UNIQUE_ID is %s.') % (unique_id)) + + +class _ThreadPoolWithWait(object): + """Base class for a delayed invocation manager used by + the Connection class to start up green threads + to handle incoming messages. + """ + + def __init__(self, conf, connection_pool): self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size) self.connection_pool = connection_pool self.conf = conf + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + + +class CallbackWrapper(_ThreadPoolWithWait): + """Wraps a straight callback to allow it to be invoked in a green + thread. + """ + + def __init__(self, conf, callback, connection_pool): + """ + :param conf: cfg.CONF instance + :param callback: a callable (probably a function) + :param connection_pool: connection pool as returned by + get_connection_pool() + """ + super(CallbackWrapper, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.callback = callback + + def __call__(self, message_data): + self.pool.spawn_n(self.callback, message_data) + + +class ProxyCallback(_ThreadPoolWithWait): + """Calls methods on a proxy object based on method and args.""" + + def __init__(self, conf, proxy, connection_pool): + super(ProxyCallback, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.proxy = proxy + self.msg_id_cache = _MsgIdCache() + def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -254,6 +404,7 @@ class ProxyCallback(object): if hasattr(local.store, 'context'): del local.store.context rpc_common._safe_log(LOG.debug, _('received %s'), message_data) + self.msg_id_cache.check_duplicate_message(message_data) ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) @@ -292,15 +443,73 @@ class ProxyCallback(object): connection_pool=self.connection_pool, log_failure=False) except Exception: - LOG.exception(_('Exception during message handling')) - ctxt.reply(None, sys.exc_info(), - connection_pool=self.connection_pool) - - def wait(self): - """Wait for all callback threads to exit.""" - self.pool.waitall() + # sys.exc_info() is deleted by LOG.exception(). + exc_info = sys.exc_info() + LOG.error(_('Exception during message handling'), + exc_info=exc_info) + ctxt.reply(None, exc_info, connection_pool=self.connection_pool) +class MulticallProxyWaiter(object): + def __init__(self, conf, msg_id, timeout, connection_pool): + self._msg_id = msg_id + self._timeout = timeout or conf.rpc_response_timeout + self._reply_proxy = connection_pool.reply_proxy + self._done = False + self._got_ending = False + self._conf = conf + self._dataqueue = queue.LightQueue() + # Add this caller to the reply proxy's call_waiters + self._reply_proxy.add_call_waiter(self, self._msg_id) + self.msg_id_cache = _MsgIdCache() + + def put(self, data): + self._dataqueue.put(data) + + def done(self): + if self._done: + return + self._done = True + # Remove this caller from reply proxy's call_waiters + self._reply_proxy.del_call_waiter(self._msg_id) + + def _process_data(self, data): + result = None + self.msg_id_cache.check_duplicate_message(data) + if data['failure']: + failure = data['failure'] + result = rpc_common.deserialize_remote_exception(self._conf, + failure) + elif data.get('ending', False): + self._got_ending = True + else: + result = data['result'] + return result + + def __iter__(self): + """Return a result until we get a reply with an 'ending" flag""" + if self._done: + raise StopIteration + while True: + try: + data = self._dataqueue.get(timeout=self._timeout) + result = self._process_data(data) + except queue.Empty: + self.done() + raise rpc_common.Timeout() + except Exception: + with excutils.save_and_reraise_exception(): + self.done() + if self._got_ending: + self.done() + raise StopIteration + if isinstance(result, Exception): + self.done() + raise result + yield result + + +#TODO(pekowski): Remove MulticallWaiter() in Havana. class MulticallWaiter(object): def __init__(self, conf, connection, timeout): self._connection = connection @@ -310,6 +519,7 @@ class MulticallWaiter(object): self._done = False self._got_ending = False self._conf = conf + self.msg_id_cache = _MsgIdCache() def done(self): if self._done: @@ -321,6 +531,7 @@ class MulticallWaiter(object): def __call__(self, data): """The consume() callback will call this. Store the result.""" + self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] self._result = rpc_common.deserialize_remote_exception(self._conf, @@ -356,22 +567,41 @@ def create_connection(conf, new, connection_pool): return ConnectionContext(conf, connection_pool, pooled=not new) +_reply_proxy_create_sem = semaphore.Semaphore() + + def multicall(conf, context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" + # TODO(pekowski): Remove all these comments in Havana. + # For amqp_rpc_single_reply_queue = False, # Can't use 'with' for multicall, as it returns an iterator # that will continue to use the connection. When it's done, # connection.close() will get called which will put it back into # the pool + # For amqp_rpc_single_reply_queue = True, + # The 'with' statement is mandatory for closing the connection LOG.debug(_('Making synchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) + _add_unique_id(msg) pack_context(msg, context) - conn = ConnectionContext(conf, connection_pool) - wait_msg = MulticallWaiter(conf, conn, timeout) - conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + # TODO(pekowski): Remove this flag and the code under the if clause + # in Havana. + if not conf.amqp_rpc_single_reply_queue: + conn = ConnectionContext(conf, connection_pool) + wait_msg = MulticallWaiter(conf, conn, timeout) + conn.declare_direct_consumer(msg_id, wait_msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + else: + with _reply_proxy_create_sem: + if not connection_pool.reply_proxy: + connection_pool.reply_proxy = ReplyProxy(conf, connection_pool) + msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()}) + wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool) + with ConnectionContext(conf, connection_pool) as conn: + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg @@ -388,6 +618,7 @@ def call(conf, context, topic, msg, timeout, connection_pool): def cast(conf, context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.topic_send(topic, rpc_common.serialize_msg(msg)) @@ -396,6 +627,7 @@ def cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.fanout_send(topic, rpc_common.serialize_msg(msg)) @@ -403,6 +635,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): def cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a topic to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -412,6 +645,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): def fanout_cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a fanout exchange to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -423,6 +657,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope): LOG.debug(_('Sending %(event_type)s on %(topic)s'), dict(event_type=msg.get('event_type'), topic=topic)) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: if envelope: diff --git a/designate/openstack/common/rpc/common.py b/designate/openstack/common/rpc/common.py index 2095c7a0..1d4194ea 100644 --- a/designate/openstack/common/rpc/common.py +++ b/designate/openstack/common/rpc/common.py @@ -21,7 +21,8 @@ import copy import sys import traceback -from designate.openstack.common import cfg +from oslo.config import cfg + from designate.openstack.common.gettextutils import _ from designate.openstack.common import importutils from designate.openstack.common import jsonutils @@ -48,8 +49,8 @@ deserialize_msg(). The current message format (version 2.0) is very simple. It is: { - 'designate.version': , - 'designate.message': + 'oslo.version': , + 'oslo.message': } Message format version '1.0' is just considered to be the messages we sent @@ -65,8 +66,8 @@ to the messaging libraries as a dict. ''' _RPC_ENVELOPE_VERSION = '2.0' -_VERSION_KEY = 'designate.version' -_MESSAGE_KEY = 'designate.message' +_VERSION_KEY = 'oslo.version' +_MESSAGE_KEY = 'oslo.message' # TODO(russellb) Turn this on after Grizzly. @@ -124,6 +125,10 @@ class Timeout(RPCException): message = _("Timeout while waiting on RPC response.") +class DuplicateMessageError(RPCException): + message = _("Found duplicate message(%(msg_id)s). Skipping it.") + + class InvalidRPCConnectionReuse(RPCException): message = _("Invalid reuse of an RPC connection.") @@ -196,23 +201,30 @@ class Connection(object): """ raise NotImplementedError() - def consume_in_thread(self): - """Spawn a thread to handle incoming messages. + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + """Register as a member of a group of consumers for a given topic from + the specified exchange. - Spawn a thread that will be responsible for handling all incoming - messages for consumers that were set up on this connection. + Exactly one member of a given pool will receive each message. - Message dispatching inside of this is expected to be implemented in a - non-blocking manner. An example implementation would be having this - thread pull messages in for all of the consumers, but utilize a thread - pool for dispatching the messages to the proxy objects. + A message will be delivered to multiple pools, if more than + one is created. + + :param callback: Callable to be invoked for each message. + :type callback: callable accepting one argument + :param pool_name: The name of the consumer pool. + :type pool_name: str + :param topic: The routing topic for desired messages. + :type topic: str + :param exchange_name: The name of the message exchange where + the client should attach. Defaults to + the configured exchange. + :type exchange_name: str """ raise NotImplementedError() - def consume_in_thread_group(self, thread_group): - """ - Spawn a thread to handle incoming messages in the supplied - ThreadGroup. + def consume_in_thread(self): + """Spawn a thread to handle incoming messages. Spawn a thread that will be responsible for handling all incoming messages for consumers that were set up on this connection. @@ -304,7 +316,7 @@ def deserialize_remote_exception(conf, data): # NOTE(ameade): We DO NOT want to allow just any module to be imported, in # order to prevent arbitrary code execution. - if not module in conf.allowed_rpc_exception_modules: + if module not in conf.allowed_rpc_exception_modules: return RemoteError(name, failure.get('message'), trace) try: @@ -313,7 +325,7 @@ def deserialize_remote_exception(conf, data): if not issubclass(klass, Exception): raise TypeError("Can only deserialize Exceptions") - failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) + failure = klass(**failure.get('kwargs', {})) except (AttributeError, TypeError, ImportError): return RemoteError(name, failure.get('message'), trace) diff --git a/designate/openstack/common/rpc/impl_fake.py b/designate/openstack/common/rpc/impl_fake.py index 1d244a1e..2f4ce858 100644 --- a/designate/openstack/common/rpc/impl_fake.py +++ b/designate/openstack/common/rpc/impl_fake.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -119,9 +119,6 @@ class Connection(object): def consume_in_thread(self): pass - def consume_in_thread_group(self, thread_group): - pass - def create_connection(conf, new=True): """Create a connection""" diff --git a/designate/openstack/common/rpc/impl_kombu.py b/designate/openstack/common/rpc/impl_kombu.py index 7c43b4ef..97e3698b 100644 --- a/designate/openstack/common/rpc/impl_kombu.py +++ b/designate/openstack/common/rpc/impl_kombu.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -28,8 +28,8 @@ import kombu import kombu.connection import kombu.entity import kombu.messaging +from oslo.config import cfg -from designate.openstack.common import cfg from designate.openstack.common.gettextutils import _ from designate.openstack.common import network_utils from designate.openstack.common.rpc import amqp as rpc_amqp @@ -66,7 +66,8 @@ kombu_opts = [ help='the RabbitMQ userid'), cfg.StrOpt('rabbit_password', default='guest', - help='the RabbitMQ password'), + help='the RabbitMQ password', + secret=True), cfg.StrOpt('rabbit_virtual_host', default='/', help='the RabbitMQ virtual host'), @@ -164,9 +165,10 @@ class ConsumerBase(object): try: msg = rpc_common.deserialize_msg(message.payload) callback(msg) - message.ack() except Exception: LOG.exception(_("Failed to process message... skipping it.")) + finally: + message.ack() self.queue.consume(*args, callback=_callback, **options) @@ -196,6 +198,7 @@ class DirectConsumer(ConsumerBase): """ # Default options options = {'durable': False, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': True, 'exclusive': False} options.update(kwargs) @@ -621,8 +624,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % @@ -718,25 +721,17 @@ class Connection(object): except StopIteration: return - def _consumer_thread_callback(self): - """ Consumer thread callback used by consume_in_* """ - try: - self.consume() - except greenlet.GreenletExit: - return - def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread""" - + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return if self.consumer_thread is None: - self.consumer_thread = eventlet.spawn( - self._consumer_thread_callback) + self.consumer_thread = eventlet.spawn(_consumer_thread) return self.consumer_thread - def consume_in_thread_group(self, thread_group): - """ Consume from all queues/consumers in the supplied ThreadGroup""" - thread_group.add_thread(self._consumer_thread_callback) - def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" proxy_cb = rpc_amqp.ProxyCallback( @@ -757,6 +752,30 @@ class Connection(object): self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + ) + self.proxy_callbacks.append(callback_wrapper) + self.declare_topic_consumer( + queue_name=pool_name, + topic=topic, + exchange_name=exchange_name, + callback=callback_wrapper, + ) + def create_connection(conf, new=True): """Create a connection""" diff --git a/designate/openstack/common/rpc/impl_qpid.py b/designate/openstack/common/rpc/impl_qpid.py index eb3869c1..f3f44513 100644 --- a/designate/openstack/common/rpc/impl_qpid.py +++ b/designate/openstack/common/rpc/impl_qpid.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # Copyright 2011 - 2012, Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -22,8 +22,8 @@ import uuid import eventlet import greenlet +from oslo.config import cfg -from designate.openstack.common import cfg from designate.openstack.common.gettextutils import _ from designate.openstack.common import importutils from designate.openstack.common import jsonutils @@ -40,8 +40,8 @@ qpid_opts = [ cfg.StrOpt('qpid_hostname', default='localhost', help='Qpid broker hostname'), - cfg.StrOpt('qpid_port', - default='5672', + cfg.IntOpt('qpid_port', + default=5672, help='Qpid broker port'), cfg.ListOpt('qpid_hosts', default=['$qpid_hostname:$qpid_port'], @@ -51,7 +51,8 @@ qpid_opts = [ help='Username for qpid connection'), cfg.StrOpt('qpid_password', default='', - help='Password for qpid connection'), + help='Password for qpid connection', + secret=True), cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), @@ -68,6 +69,8 @@ qpid_opts = [ cfg.CONF.register_opts(qpid_opts) +JSON_CONTENT_TYPE = 'application/json; charset=utf8' + class ConsumerBase(object): """Consumer base class.""" @@ -122,10 +125,27 @@ class ConsumerBase(object): self.receiver = session.receiver(self.address) self.receiver.capacity = 1 + def _unpack_json_msg(self, msg): + """Load the JSON data in msg if msg.content_type indicates that it + is necessary. Put the loaded data back into msg.content and + update msg.content_type appropriately. + + A Qpid Message containing a dict will have a content_type of + 'amqp/map', whereas one containing a string that needs to be converted + back from JSON will have a content_type of JSON_CONTENT_TYPE. + + :param msg: a Qpid Message object + :returns: None + """ + if msg.content_type == JSON_CONTENT_TYPE: + msg.content = jsonutils.loads(msg.content) + msg.content_type = 'amqp/map' + def consume(self): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() try: + self._unpack_json_msg(message) msg = rpc_common.deserialize_msg(message.content) self.callback(msg) except Exception: @@ -330,15 +350,16 @@ class Connection(object): def reconnect(self): """Handles reconnecting and re-establishing sessions and queues""" - if self.connection.opened(): - try: - self.connection.close() - except qpid_exceptions.ConnectionError: - pass - attempt = 0 delay = 1 while True: + # Close the session if necessary + if self.connection.opened(): + try: + self.connection.close() + except qpid_exceptions.ConnectionError: + pass + broker = self.brokers[attempt % len(self.brokers)] attempt += 1 @@ -414,8 +435,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid_exceptions.Empty): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % @@ -509,13 +530,6 @@ class Connection(object): """Send a notify message on a topic""" self.publisher_send(NotifyPublisher, topic, msg) - def _consumer_thread_callback(self): - """ Consumer thread callback used by consume_in_* """ - try: - self.consume() - except greenlet.GreenletExit: - return - def consume(self, limit=None): """Consume from all queues/consumers""" it = self.iterconsume(limit=limit) @@ -527,16 +541,15 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread""" - + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return if self.consumer_thread is None: - self.consumer_thread = eventlet.spawn( - self._consumer_thread_callback) + self.consumer_thread = eventlet.spawn(_consumer_thread) return self.consumer_thread - def consume_in_thread_group(self, thread_group): - """ Consume from all queues/consumers in the supplied ThreadGroup""" - thread_group.add_thread(self._consumer_thread_callback) - def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" proxy_cb = rpc_amqp.ProxyCallback( @@ -567,6 +580,34 @@ class Connection(object): return consumer + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + ) + self.proxy_callbacks.append(callback_wrapper) + + consumer = TopicConsumer(conf=self.conf, + session=self.session, + topic=topic, + callback=callback_wrapper, + name=pool_name, + exchange_name=exchange_name) + + self._register_consumer(consumer) + return consumer + def create_connection(conf, new=True): """Create a connection""" diff --git a/designate/openstack/common/rpc/impl_zmq.py b/designate/openstack/common/rpc/impl_zmq.py index ecc45fc1..81196b05 100644 --- a/designate/openstack/common/rpc/impl_zmq.py +++ b/designate/openstack/common/rpc/impl_zmq.py @@ -16,16 +16,17 @@ import os import pprint +import re import socket -import string import sys import types import uuid import eventlet import greenlet +from oslo.config import cfg -from designate.openstack.common import cfg +from designate.openstack.common import excutils from designate.openstack.common.gettextutils import _ from designate.openstack.common import importutils from designate.openstack.common import jsonutils @@ -90,10 +91,10 @@ def _serialize(data): Error if a developer passes us bad data. """ try: - return str(jsonutils.dumps(data, ensure_ascii=True)) + return jsonutils.dumps(data, ensure_ascii=True) except TypeError: - LOG.error(_("JSON serialization failed.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("JSON serialization failed.")) def _deserialize(data): @@ -217,11 +218,18 @@ class ZmqClient(object): socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): - if serialize: - data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + def cast(self, msg_id, topic, data, envelope=False): + msg_id = msg_id or 0 + + if not (envelope or rpc_common._SEND_RPC_ENVELOPE): + self.outq.send(map(bytes, + (msg_id, topic, 'cast', _serialize(data)))) + return + + rpc_envelope = rpc_common.serialize_msg(data[1], envelope) + zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items()) + self.outq.send(map(bytes, + (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)) def close(self): self.outq.close() @@ -295,13 +303,13 @@ class InternalContext(object): ctx.replies) LOG.debug(_("Sending reply")) - cast(CONF, ctx, topic, { + _multi_send(_cast, ctx, topic, { 'method': '-process_reply', 'args': { - 'msg_id': msg_id, + 'msg_id': msg_id, # Include for Folsom compat. 'response': response } - }) + }, _msg_id=msg_id) class ConsumerBase(object): @@ -320,22 +328,23 @@ class ConsumerBase(object): else: return [result] - def process(self, style, target, proxy, ctx, data): + def process(self, proxy, ctx, data): + data.setdefault('version', None) + data.setdefault('args', {}) + # Method starting with - are # processed internally. (non-valid method name) - method = data['method'] + method = data.get('method') + if not method: + LOG.error(_("RPC message did not include method.")) + return # Internal method # uses internal context for safety. - if data['method'][0] == '-': - # For reply / process_reply - method = method[1:] - if method == 'reply': - self.private_ctx.reply(ctx, proxy, **data['args']) + if method == '-reply': + self.private_ctx.reply(ctx, proxy, **data['args']) return - data.setdefault('version', None) - data.setdefault('args', {}) proxy.dispatch(ctx, data['version'], data['method'], **data['args']) @@ -391,24 +400,17 @@ class ZmqBaseReactor(ConsumerBase): LOG.info(_("Out reactor registered")) - def _consumer_thread_callback(self, sock): - """ Consumer thread callback used by consume_in_* """ - - LOG.info(_("Consuming socket")) - while True: - self.consume(sock) - def consume_in_thread(self): + def _consume(sock): + LOG.info(_("Consuming socket")) + while True: + self.consume(sock) + for k in self.proxies.keys(): self.threads.append( - self.pool.spawn(self._consumer_thread_callback, k) + self.pool.spawn(_consume, k) ) - def consume_in_thread_group(self, thread_group): - """ Consume from all queues/consumers in the supplied ThreadGroup""" - for k in self.proxies.keys(): - thread_group.add_thread(self._consumer_thread_callback, k) - def wait(self): for t in self.threads: t.wait() @@ -430,6 +432,8 @@ class ZmqProxy(ZmqBaseReactor): def __init__(self, conf): super(ZmqProxy, self).__init__(conf) + pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\')) + self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep))) self.topic_proxy = {} @@ -438,21 +442,15 @@ class ZmqProxy(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data - topic = topic.split('.', 1)[0] + topic = data[1] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - # Handle zmq_replies magic if topic.startswith('fanout~'): sock_type = zmq.PUB + topic = topic.split('.', 1)[0] elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = rpc_common.deserialize_msg(_deserialize(in_msg)) - msg_id = inside[-1]['args']['msg_id'] - response = inside[-1]['args']['response'] - LOG.debug(_("->response->%s"), response) - data = [str(msg_id), _serialize(response)] else: sock_type = zmq.PUSH @@ -461,6 +459,13 @@ class ZmqProxy(ZmqBaseReactor): LOG.info(_("Creating proxy for topic: %s"), topic) try: + # The topic is received over the network, + # don't trust this input. + if self.badchars.search(topic) is not None: + emsg = _("Topic contained dangerous characters.") + LOG.warn(emsg) + raise RPCException(emsg) + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), sock_type, bind=True) @@ -517,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor): ipc_dir, run_as_root=True) utils.execute('chmod', '750', ipc_dir, run_as_root=True) except utils.ProcessExecutionError: - LOG.error(_("Could not create IPC directory %s") % - (ipc_dir, )) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) try: self.register(consumption_proxy, @@ -527,13 +532,28 @@ class ZmqProxy(ZmqBaseReactor): zmq.PULL, out_bind=True) except zmq.ZMQError: - LOG.error(_("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) super(ZmqProxy, self).consume_in_thread() +def unflatten_envelope(packenv): + """Unflattens the RPC envelope. + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} + """ + i = iter(packenv) + h = {} + try: + while True: + k = i.next() + h[k] = i.next() + except StopIteration: + return h + + class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -554,38 +574,53 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data - - ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) - ctx = RpcContext.unmarshal(ctx) - proxy = self.proxies[sock] - self.pool.spawn_n(self.process, style, topic, - proxy, ctx, request) + if data[2] == 'cast': # Legacy protocol + packenv = data[3] + + ctx, msg = _deserialize(packenv) + request = rpc_common.deserialize_msg(msg) + ctx = RpcContext.unmarshal(ctx) + elif data[2] == 'impl_zmq_v2': + packenv = data[4:] + + msg = unflatten_envelope(packenv) + request = rpc_common.deserialize_msg(msg) + + # Unmarshal only after verifying the message. + ctx = RpcContext.unmarshal(data[3]) + else: + LOG.error(_("ZMQ Envelope version unsupported or unknown.")) + return + + self.pool.spawn_n(self.process, proxy, ctx, request) class Connection(rpc_common.Connection): """Manages connections and threads.""" def __init__(self, conf): + self.topics = [] self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - LOG.info(_("Create Consumer for topic (%(topic)s)") % - {'topic': topic}) + # Register with matchmaker. + _get_matchmaker().register(topic, CONF.rpc_zmq_host) # Subscription scenarios if fanout: - subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB - topic = 'fanout~' + topic + subscribe = ('', fanout)[type(fanout) == str] + topic = 'fanout~' + topic.split('.', 1)[0] else: sock_type = zmq.PULL subscribe = None + topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) + + if topic in self.topics: + LOG.info(_("Skipping topic registration. Already registered.")) + return # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ @@ -596,22 +631,26 @@ class Connection(rpc_common.Connection): self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) + self.topics.append(topic) def close(self): + _get_matchmaker().stop_heartbeat() + for topic in self.topics: + _get_matchmaker().unregister(topic, CONF.rpc_zmq_host) + self.reactor.close() + self.topics = [] def wait(self): self.reactor.wait() def consume_in_thread(self): + _get_matchmaker().start_heartbeat() self.reactor.consume_in_thread() - def consume_in_thread_group(self, thread_group): - self.reactor.consume_in_thread_group(thread_group) - -def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, - force_envelope=False): +def _cast(addr, context, topic, msg, timeout=None, envelope=False, + _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -620,7 +659,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -628,8 +667,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None, - serialize=True, force_envelope=False): +def _call(addr, context, topic, msg, timeout=None, + envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -659,23 +698,36 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, with Timeout(timeout, exception=rpc_common.Timeout): try: msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir, + "ipc://%s/zmq_topic_zmq_replies.%s" % + (CONF.rpc_zmq_ipc_dir, + CONF.rpc_zmq_host), zmq.SUB, subscribe=msg_id, bind=False ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload, - serialize=serialize, force_envelope=force_envelope) + _cast(addr, context, topic, payload, envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + + if msg[2] == 'cast': # Legacy version + raw_msg = _deserialize(msg[-1])[-1] + elif msg[2] == 'impl_zmq_v2': + rpc_envelope = unflatten_envelope(msg[4:]) + raw_msg = rpc_common.deserialize_msg(rpc_envelope) + else: + raise rpc_common.UnsupportedRpcEnvelopeVersion( + _("Unsupported or unknown ZMQ envelope returned.")) + + responses = raw_msg['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") + except (IndexError, KeyError): + raise RPCException(_("RPC Message Invalid.")) finally: if 'msg_waiter' in vars(): msg_waiter.close() @@ -691,8 +743,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False): +def _multi_send(method, context, topic, msg, timeout=None, + envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -709,7 +761,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, LOG.warn(_("No matchmaker results. Not casting.")) # While not strictly a timeout, callers know how to handle # this exception and a timeout isn't too big a lie. - raise rpc_common.Timeout, "No match from matchmaker." + raise rpc_common.Timeout(_("No match from matchmaker.")) # This supports brokerless fanout (addresses > 1) for queue in queues: @@ -718,11 +770,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout, serialize, - force_envelope) + _topic, msg, timeout, envelope, + _msg_id) return - return method(_addr, context, _topic, _topic, msg, timeout, - serialize, force_envelope) + return method(_addr, context, _topic, msg, timeout, + envelope) def create_connection(conf, new=True): @@ -752,7 +804,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs): _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) -def notify(conf, context, topic, msg, **kwargs): +def notify(conf, context, topic, msg, envelope): """ Send notification event. Notifications are sent to topic-priority. @@ -760,10 +812,8 @@ def notify(conf, context, topic, msg, **kwargs): """ # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. - topic.replace('.', '-') - kwargs['serialize'] = kwargs.pop('envelope') - kwargs['force_envelope'] = True - cast(conf, context, topic, msg, **kwargs) + topic = topic.replace('.', '-') + cast(conf, context, topic, msg, envelope=envelope) def cleanup(): @@ -787,21 +837,9 @@ def _get_ctxt(): return ZMQ_CTX -def _get_matchmaker(): +def _get_matchmaker(*args, **kwargs): global matchmaker if not matchmaker: - # rpc_zmq_matchmaker should be set to a 'module.Class' - mm_path = CONF.rpc_zmq_matchmaker.split('.') - mm_module = '.'.join(mm_path[:-1]) - mm_class = mm_path[-1] - - # Only initialize a class. - if mm_path[-1][0] not in string.ascii_uppercase: - LOG.error(_("Matchmaker could not be loaded.\n" - "rpc_zmq_matchmaker is not a class.")) - raise RPCException(_("Error loading Matchmaker.")) - - mm_impl = importutils.import_module(mm_module) - mm_constructor = getattr(mm_impl, mm_class) - matchmaker = mm_constructor() + matchmaker = importutils.import_object( + CONF.rpc_zmq_matchmaker, *args, **kwargs) return matchmaker diff --git a/designate/openstack/common/rpc/matchmaker.py b/designate/openstack/common/rpc/matchmaker.py index 8faaae35..de532c8b 100644 --- a/designate/openstack/common/rpc/matchmaker.py +++ b/designate/openstack/common/rpc/matchmaker.py @@ -22,7 +22,9 @@ import contextlib import itertools import json -from designate.openstack.common import cfg +import eventlet +from oslo.config import cfg + from designate.openstack.common.gettextutils import _ from designate.openstack.common import log as logging @@ -32,6 +34,12 @@ matchmaker_opts = [ cfg.StrOpt('matchmaker_ringfile', default='/etc/nova/matchmaker_ring.json', help='Matchmaker ring file (JSON)'), + cfg.IntOpt('matchmaker_heartbeat_freq', + default=300, + help='Heartbeat frequency'), + cfg.IntOpt('matchmaker_heartbeat_ttl', + default=600, + help='Heartbeat time-to-live.'), ] CONF = cfg.CONF @@ -69,12 +77,73 @@ class Binding(object): class MatchMakerBase(object): - """Match Maker Base Class.""" - + """ + Match Maker Base Class. + Build off HeartbeatMatchMakerBase if building a + heartbeat-capable MatchMaker. + """ def __init__(self): # Array of tuples. Index [2] toggles negation, [3] is last-if-true self.bindings = [] + self.no_heartbeat_msg = _('Matchmaker does not implement ' + 'registration or heartbeat.') + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + pass + + def ack_alive(self, key, host): + """ + Acknowledge that a key.host is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + pass + + def is_alive(self, topic, host): + """ + Checks if a host is alive. + """ + pass + + def expire(self, topic, host): + """ + Explicitly expire a host's registration. + """ + pass + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + pass + + def unregister(self, key, host): + """ + Unregister a topic. + """ + pass + + def start_heartbeat(self): + """ + Spawn heartbeat greenthread. + """ + pass + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + pass + def add_binding(self, binding, rule, last=True): self.bindings.append((binding, rule, False, last)) @@ -98,6 +167,103 @@ class MatchMakerBase(object): return workers +class HeartbeatMatchMakerBase(MatchMakerBase): + """ + Base for a heart-beat capable MatchMaker. + Provides common methods for registering, + unregistering, and maintaining heartbeats. + """ + def __init__(self): + self.hosts = set() + self._heart = None + self.host_topic = {} + + super(HeartbeatMatchMakerBase, self).__init__() + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + for key, host in self.host_topic: + self.ack_alive(key, host) + + def ack_alive(self, key, host): + """ + Acknowledge that a host.topic is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + raise NotImplementedError("Must implement ack_alive") + + def backend_register(self, key, host): + """ + Implements registration logic. + Called by register(self,key,host) + """ + raise NotImplementedError("Must implement backend_register") + + def backend_unregister(self, key, key_host): + """ + Implements de-registration logic. + Called by unregister(self,key,host) + """ + raise NotImplementedError("Must implement backend_unregister") + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + self.hosts.add(host) + self.host_topic[(key, host)] = host + key_host = '.'.join((key, host)) + + self.backend_register(key, key_host) + + self.ack_alive(key, host) + + def unregister(self, key, host): + """ + Unregister a topic. + """ + if (key, host) in self.host_topic: + del self.host_topic[(key, host)] + + self.hosts.discard(host) + self.backend_unregister(key, '.'.join((key, host))) + + LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host))) + + def start_heartbeat(self): + """ + Implementation of MatchMakerBase.start_heartbeat + Launches greenthread looping send_heartbeats(), + yielding for CONF.matchmaker_heartbeat_freq seconds + between iterations. + """ + if len(self.hosts) == 0: + raise MatchMakerException( + _("Register before starting heartbeat.")) + + def do_heartbeat(): + while True: + self.send_heartbeats() + eventlet.sleep(CONF.matchmaker_heartbeat_freq) + + self._heart = eventlet.spawn(do_heartbeat) + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + if self._heart: + self._heart.kill() + + class DirectBinding(Binding): """ Specifies a host in the key via a '.' character @@ -201,24 +367,25 @@ class FanoutRingExchange(RingExchange): class LocalhostExchange(Exchange): """Exchange where all direct topics are local.""" - def __init__(self): + def __init__(self, host='localhost'): + self.host = host super(Exchange, self).__init__() def run(self, key): - return [(key.split('.')[0] + '.localhost', 'localhost')] + return [('.'.join((key.split('.')[0], self.host)), self.host)] class DirectExchange(Exchange): """ Exchange where all topic keys are split, sending to second half. - i.e. "compute.host" sends a message to "compute" running on "host" + i.e. "compute.host" sends a message to "compute.host" running on "host" """ def __init__(self): super(Exchange, self).__init__() def run(self, key): - b, e = key.split('.', 1) - return [(b, e)] + e = key.split('.', 1)[1] + return [(key, e)] class MatchMakerRing(MatchMakerBase): @@ -237,11 +404,11 @@ class MatchMakerLocalhost(MatchMakerBase): Match Maker where all bare topics resolve to localhost. Useful for testing. """ - def __init__(self): + def __init__(self, host='localhost'): super(MatchMakerLocalhost, self).__init__() - self.add_binding(FanoutBinding(), LocalhostExchange()) + self.add_binding(FanoutBinding(), LocalhostExchange(host)) self.add_binding(DirectBinding(), DirectExchange()) - self.add_binding(TopicBinding(), LocalhostExchange()) + self.add_binding(TopicBinding(), LocalhostExchange(host)) class MatchMakerStub(MatchMakerBase): diff --git a/designate/openstack/common/rpc/matchmaker_redis.py b/designate/openstack/common/rpc/matchmaker_redis.py new file mode 100644 index 00000000..f2142161 --- /dev/null +++ b/designate/openstack/common/rpc/matchmaker_redis.py @@ -0,0 +1,149 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should accept a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +from oslo.config import cfg + +from designate.openstack.common import importutils +from designate.openstack.common import log as logging +from designate.openstack.common.rpc import matchmaker as mm_common + +redis = importutils.try_import('redis') + + +matchmaker_redis_opts = [ + cfg.StrOpt('host', + default='127.0.0.1', + help='Host to locate redis'), + cfg.IntOpt('port', + default=6379, + help='Use this port to connect to redis host.'), + cfg.StrOpt('password', + default=None, + help='Password for Redis server. (optional)'), +] + +CONF = cfg.CONF +opt_group = cfg.OptGroup(name='matchmaker_redis', + title='Options for Redis-based MatchMaker') +CONF.register_group(opt_group) +CONF.register_opts(matchmaker_redis_opts, opt_group) +LOG = logging.getLogger(__name__) + + +class RedisExchange(mm_common.Exchange): + def __init__(self, matchmaker): + self.matchmaker = matchmaker + self.redis = matchmaker.redis + super(RedisExchange, self).__init__() + + +class RedisTopicExchange(RedisExchange): + """ + Exchange where all topic keys are split, sending to second half. + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def run(self, topic): + while True: + member_name = self.redis.srandmember(topic) + + if not member_name: + # If this happens, there are no + # longer any members. + break + + if not self.matchmaker.is_alive(topic, member_name): + continue + + host = member_name.split('.', 1)[1] + return [(member_name, host)] + return [] + + +class RedisFanoutExchange(RedisExchange): + """ + Return a list of all hosts. + """ + def run(self, topic): + topic = topic.split('~', 1)[1] + hosts = self.redis.smembers(topic) + good_hosts = filter( + lambda host: self.matchmaker.is_alive(topic, host), hosts) + + return [(x, x.split('.', 1)[1]) for x in good_hosts] + + +class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): + """ + MatchMaker registering and looking-up hosts with a Redis server. + """ + def __init__(self): + super(MatchMakerRedis, self).__init__() + + if not redis: + raise ImportError("Failed to import module redis.") + + self.redis = redis.StrictRedis( + host=CONF.matchmaker_redis.host, + port=CONF.matchmaker_redis.port, + password=CONF.matchmaker_redis.password) + + self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self)) + self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange()) + self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self)) + + def ack_alive(self, key, host): + topic = "%s.%s" % (key, host) + if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl): + # If we could not update the expiration, the key + # might have been pruned. Re-register, creating a new + # key in Redis. + self.register(self.topic_host[host], host) + + def is_alive(self, topic, host): + if self.redis.ttl(host) == -1: + self.expire(topic, host) + return False + return True + + def expire(self, topic, host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.delete(host) + pipe.srem(topic, host) + pipe.execute() + + def backend_register(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.sadd(key, key_host) + + # No value is needed, we just + # care if it exists. Sets aren't viable + # because only keys can expire. + pipe.set(key_host, '') + + pipe.execute() + + def backend_unregister(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.srem(key, key_host) + pipe.delete(key_host) + pipe.execute() diff --git a/designate/openstack/common/rpc/service.py b/designate/openstack/common/rpc/service.py index a85a3166..a0d07219 100644 --- a/designate/openstack/common/rpc/service.py +++ b/designate/openstack/common/rpc/service.py @@ -63,7 +63,7 @@ class Service(service.Service): self.manager.initialize_service_hook(self) # Consume from all consumers in a thread - self.conn.consume_in_thread_group(self.tg) + self.conn.consume_in_thread() def stop(self): # Try to shut the connection down, but if we get any sort of diff --git a/designate/openstack/common/service.py b/designate/openstack/common/service.py index 5ee71959..75394bc7 100644 --- a/designate/openstack/common/service.py +++ b/designate/openstack/common/service.py @@ -28,8 +28,8 @@ import time import eventlet import logging as std_logging +from oslo.config import cfg -from designate.openstack.common import cfg from designate.openstack.common import eventlet_backdoor from designate.openstack.common.gettextutils import _ from designate.openstack.common import importutils diff --git a/designate/openstack/common/sslutils.py b/designate/openstack/common/sslutils.py new file mode 100644 index 00000000..e264ef62 --- /dev/null +++ b/designate/openstack/common/sslutils.py @@ -0,0 +1,80 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 IBM +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import ssl + +from oslo.config import cfg + +from designate.openstack.common.gettextutils import _ + + +ssl_opts = [ + cfg.StrOpt('ca_file', + default=None, + help="CA certificate file to use to verify " + "connecting clients"), + cfg.StrOpt('cert_file', + default=None, + help="Certificate file to use when starting " + "the server securely"), + cfg.StrOpt('key_file', + default=None, + help="Private key file to use when starting " + "the server securely"), +] + + +CONF = cfg.CONF +CONF.register_opts(ssl_opts, "ssl") + + +def is_enabled(): + cert_file = CONF.ssl.cert_file + key_file = CONF.ssl.key_file + ca_file = CONF.ssl.ca_file + use_ssl = cert_file or key_file + + if cert_file and not os.path.exists(cert_file): + raise RuntimeError(_("Unable to find cert_file : %s") % cert_file) + + if ca_file and not os.path.exists(ca_file): + raise RuntimeError(_("Unable to find ca_file : %s") % ca_file) + + if key_file and not os.path.exists(key_file): + raise RuntimeError(_("Unable to find key_file : %s") % key_file) + + if use_ssl and (not cert_file or not key_file): + raise RuntimeError(_("When running server in SSL mode, you must " + "specify both a cert_file and key_file " + "option value in your configuration file")) + + return use_ssl + + +def wrap(sock): + ssl_kwargs = { + 'server_side': True, + 'certfile': CONF.ssl.cert_file, + 'keyfile': CONF.ssl.key_file, + 'cert_reqs': ssl.CERT_NONE, + } + + if CONF.ssl.ca_file: + ssl_kwargs['ca_certs'] = CONF.ssl.ca_file + ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED + + return ssl.wrap_socket(sock, **ssl_kwargs) diff --git a/designate/openstack/common/threadgroup.py b/designate/openstack/common/threadgroup.py index e337db1b..fbf6916b 100644 --- a/designate/openstack/common/threadgroup.py +++ b/designate/openstack/common/threadgroup.py @@ -63,7 +63,7 @@ class ThreadGroup(object): def add_timer(self, interval, callback, initial_delay=None, *args, **kwargs): - pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) + pulse = loopingcall.LoopingCall(callback, *args, **kwargs) pulse.start(interval=interval, initial_delay=initial_delay) self.timers.append(pulse) diff --git a/designate/openstack/common/timeutils.py b/designate/openstack/common/timeutils.py index 5a011e81..60943659 100644 --- a/designate/openstack/common/timeutils.py +++ b/designate/openstack/common/timeutils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -25,18 +25,22 @@ import datetime import iso8601 -TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" -PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" +# ISO 8601 extended time format with microseconds +_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f' +_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' +PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND -def isotime(at=None): +def isotime(at=None, subsecond=False): """Stringify time in ISO 8601 format""" if not at: at = utcnow() - str = at.strftime(TIME_FORMAT) + st = at.strftime(_ISO8601_TIME_FORMAT + if not subsecond + else _ISO8601_TIME_FORMAT_SUBSECOND) tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' - str += ('Z' if tz == 'UTC' else tz) - return str + st += ('Z' if tz == 'UTC' else tz) + return st def parse_isotime(timestr): @@ -179,4 +183,4 @@ def is_soon(dt, window): :return: True if expiration is within the given duration """ soon = (utcnow() + datetime.timedelta(seconds=window)) - return normalize_time(dt) < soon + return normalize_time(dt) <= soon diff --git a/designate/openstack/common/wsgi.py b/designate/openstack/common/wsgi.py index ec6472ab..38a7c6e9 100644 --- a/designate/openstack/common/wsgi.py +++ b/designate/openstack/common/wsgi.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -17,15 +17,19 @@ """Utility methods for working with WSGI servers.""" -import datetime import eventlet -import eventlet.wsgi - eventlet.patcher.monkey_patch(all=False, socket=True) +import datetime +import errno +import socket +import sys +import time + +import eventlet.wsgi +from oslo.config import cfg import routes import routes.middleware -import sys import webob.dec import webob.exc from xml.dom import minidom @@ -36,7 +40,21 @@ from designate.openstack.common.gettextutils import _ from designate.openstack.common import jsonutils from designate.openstack.common import log as logging from designate.openstack.common import service +from designate.openstack.common import sslutils +from designate.openstack.common import xmlutils +socket_opts = [ + cfg.IntOpt('backlog', + default=4096, + help="Number of backlog requests to configure the socket with"), + cfg.IntOpt('tcp_keepidle', + default=600, + help="Sets the value of TCP_KEEPIDLE in seconds for each " + "server socket. Not supported on OS X."), +] + +CONF = cfg.CONF +CONF.register_opts(socket_opts) LOG = logging.getLogger(__name__) @@ -56,15 +74,54 @@ class Service(service.Service): """ def __init__(self, application, port, - host='0.0.0.0', backlog=128, threads=1000): + host='0.0.0.0', backlog=4096, threads=1000): self.application = application self._port = port self._host = host - self.backlog = backlog - self._socket = eventlet.listen((self._host, self._port), - backlog=self.backlog) + self._backlog = backlog if backlog else CONF.backlog super(Service, self).__init__(threads) + def _get_socket(self, host, port, backlog): + # TODO(dims): eventlet's green dns/socket module does not actually + # support IPv6 in getaddrinfo(). We need to get around this in the + # future or monitor upstream for a fix + info = socket.getaddrinfo(host, + port, + socket.AF_UNSPEC, + socket.SOCK_STREAM)[0] + family = info[0] + bind_addr = info[-1] + + sock = None + retry_until = time.time() + 30 + while not sock and time.time() < retry_until: + try: + sock = eventlet.listen(bind_addr, + backlog=backlog, + family=family) + if sslutils.is_enabled(): + sock = sslutils.wrap(sock) + + except socket.error, err: + if err.args[0] != errno.EADDRINUSE: + raise + eventlet.sleep(0.1) + if not sock: + raise RuntimeError(_("Could not bind to %(host)s:%(port)s " + "after trying for 30 seconds") % + {'host': host, 'port': port}) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # sockets can hang around forever without keepalive + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # This option isn't available in the OS X version of eventlet + if hasattr(socket, 'TCP_KEEPIDLE'): + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPIDLE, + CONF.tcp_keepidle) + + return sock + def start(self): """Start serving this service using the provided server instance. @@ -72,8 +129,13 @@ class Service(service.Service): """ super(Service, self).start() + self._socket = self._get_socket(self._host, self._port, self._backlog) self.tg.add_thread(self._run, self.application, self._socket) + @property + def backlog(self): + return self._backlog + @property def host(self): return self._socket.getsockname()[0] if self._socket else self._host @@ -93,7 +155,9 @@ class Service(service.Service): def _run(self, application, socket): """Start a WSGI server in a new green thread.""" logger = logging.getLogger('eventlet.wsgi') - eventlet.wsgi.server(socket, application, custom_pool=self.tg.pool, + eventlet.wsgi.server(socket, + application, + custom_pool=self.tg.pool, log=logging.WritableLogger(logger)) @@ -680,7 +744,7 @@ class XMLDeserializer(TextDeserializer): plurals = set(self.metadata.get('plurals', {})) try: - node = minidom.parseString(datastring).childNodes[0] + node = xmlutils.safe_minidom_parse_string(datastring).childNodes[0] return {node.nodeName: self._from_xml_node(node, plurals)} except expat.ExpatError: msg = _("cannot understand XML") diff --git a/designate/openstack/common/xmlutils.py b/designate/openstack/common/xmlutils.py new file mode 100644 index 00000000..33700485 --- /dev/null +++ b/designate/openstack/common/xmlutils.py @@ -0,0 +1,74 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 IBM +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from xml.dom import minidom +from xml.parsers import expat +from xml import sax +from xml.sax import expatreader + + +class ProtectedExpatParser(expatreader.ExpatParser): + """An expat parser which disables DTD's and entities by default.""" + + def __init__(self, forbid_dtd=True, forbid_entities=True, + *args, **kwargs): + # Python 2.x old style class + expatreader.ExpatParser.__init__(self, *args, **kwargs) + self.forbid_dtd = forbid_dtd + self.forbid_entities = forbid_entities + + def start_doctype_decl(self, name, sysid, pubid, has_internal_subset): + raise ValueError("Inline DTD forbidden") + + def entity_decl(self, entityName, is_parameter_entity, value, base, + systemId, publicId, notationName): + raise ValueError(" entity declaration forbidden") + + def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name): + # expat 1.2 + raise ValueError(" unparsed entity forbidden") + + def external_entity_ref(self, context, base, systemId, publicId): + raise ValueError(" external entity forbidden") + + def notation_decl(self, name, base, sysid, pubid): + raise ValueError(" notation forbidden") + + def reset(self): + expatreader.ExpatParser.reset(self) + if self.forbid_dtd: + self._parser.StartDoctypeDeclHandler = self.start_doctype_decl + self._parser.EndDoctypeDeclHandler = None + if self.forbid_entities: + self._parser.EntityDeclHandler = self.entity_decl + self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl + self._parser.ExternalEntityRefHandler = self.external_entity_ref + self._parser.NotationDeclHandler = self.notation_decl + try: + self._parser.SkippedEntityHandler = None + except AttributeError: + # some pyexpat versions do not support SkippedEntity + pass + + +def safe_minidom_parse_string(xml_string): + """Parse an XML string using minidom safely. + + """ + try: + return minidom.parseString(xml_string, parser=ProtectedExpatParser()) + except sax.SAXParseException: + raise expat.ExpatError() diff --git a/designate/policy.py b/designate/policy.py index b3edb314..08e4038e 100644 --- a/designate/policy.py +++ b/designate/policy.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common import policy from designate import utils diff --git a/designate/quota/__init__.py b/designate/quota/__init__.py index ca58cb24..6280ed3c 100644 --- a/designate/quota/__init__.py +++ b/designate/quota/__init__.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.quota.base import Quota diff --git a/designate/quota/base.py b/designate/quota/base.py index 4e410347..de6d84fd 100644 --- a/designate/quota/base.py +++ b/designate/quota/base.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. import abc -from designate.openstack.common import cfg +from oslo.config import cfg from designate import exceptions from designate.plugin import Plugin diff --git a/designate/sink/__init__.py b/designate/sink/__init__.py index 4ab92c09..e73d1a16 100644 --- a/designate/sink/__init__.py +++ b/designate/sink/__init__.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg cfg.CONF.register_group(cfg.OptGroup( name='service:sink', title="Configuration for Sink Service" diff --git a/designate/sink/service.py b/designate/sink/service.py index fd7f9a31..5b1a680c 100644 --- a/designate/sink/service.py +++ b/designate/sink/service.py @@ -13,11 +13,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common import rpc from designate.openstack.common import service from stevedore.named import NamedExtensionManager +from designate import exceptions LOG = logging.getLogger(__name__) @@ -49,15 +50,20 @@ class Service(service.Service): try: return self.extensions_manager.map(_load_extension) except RuntimeError: - # No handlers enabled. No problem. - return [] + # No handlers enabled. Bail! + raise exceptions.ConfigurationError('No designate-sink handlers ' + 'enabled') def start(self): super(Service, self).start() # Setup notification subscriptions and start consuming self._setup_subscriptions() - self.rpc_conn.consume_in_thread_group(self.tg) + self.rpc_conn.consume_in_thread() + + def wait(self): + super(Service, self).wait() + self.rpc_conn.consumer_thread.wait() def stop(self): # Try to shut the connection down, but if we get any sort of diff --git a/designate/sqlalchemy/session.py b/designate/sqlalchemy/session.py index 17c8f6f6..3466dba4 100644 --- a/designate/sqlalchemy/session.py +++ b/designate/sqlalchemy/session.py @@ -24,7 +24,7 @@ from sqlalchemy.exc import DisconnectionError, OperationalError import sqlalchemy.orm from sqlalchemy.pool import NullPool, StaticPool -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common.gettextutils import _ diff --git a/designate/storage/__init__.py b/designate/storage/__init__.py index cacd20a8..dec0e3e8 100644 --- a/designate/storage/__init__.py +++ b/designate/storage/__init__.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.storage.base import Storage diff --git a/designate/storage/impl_sqlalchemy/__init__.py b/designate/storage/impl_sqlalchemy/__init__.py index a18e4512..bfd03b34 100644 --- a/designate/storage/impl_sqlalchemy/__init__.py +++ b/designate/storage/impl_sqlalchemy/__init__.py @@ -16,7 +16,7 @@ import time from sqlalchemy.orm import exc from sqlalchemy import distinct, func -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate import exceptions from designate.storage import base diff --git a/designate/tests/__init__.py b/designate/tests/__init__.py index a6e8c1ef..096f734e 100644 --- a/designate/tests/__init__.py +++ b/designate/tests/__init__.py @@ -16,7 +16,7 @@ import copy import unittest2 import mox -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.openstack.common.notifier import test_notifier from designate.openstack.common import policy diff --git a/designate/tests/test_api/test_middleware.py b/designate/tests/test_api/test_middleware.py index f80b846b..b860c57b 100644 --- a/designate/tests/test_api/test_middleware.py +++ b/designate/tests/test_api/test_middleware.py @@ -129,7 +129,7 @@ class KeystoneContextMiddlewareTest(ApiTestCase): context = request.environ['context'] self.assertFalse(context.is_admin) - self.assertEqual('AuthToken', context.auth_tok) + self.assertEqual('AuthToken', context.auth_token) self.assertEqual('UserID', context.user_id) self.assertEqual('TenantID', context.tenant_id) self.assertEqual(['admin', 'Member'], context.roles) @@ -159,7 +159,7 @@ class KeystoneContextMiddlewareTest(ApiTestCase): context = request.environ['context'] self.assertFalse(context.is_admin) - self.assertEqual('AuthToken', context.auth_tok) + self.assertEqual('AuthToken', context.auth_token) self.assertEqual('UserID', context.user_id) self.assertEqual('TenantID', context.original_tenant_id) self.assertEqual('5a993bf8-d521-420a-81e1-192d9cc3d5a0', @@ -183,7 +183,7 @@ class NoAuthContextMiddlewareTest(ApiTestCase): context = request.environ['context'] self.assertTrue(context.is_admin) - self.assertIsNone(context.auth_tok) + self.assertIsNone(context.auth_token) self.assertIsNone(context.user_id) self.assertIsNone(context.tenant_id) self.assertEqual([], context.roles) diff --git a/designate/tests/test_backend/__init__.py b/designate/tests/test_backend/__init__.py index 7bf7cc0d..a8a97cf2 100644 --- a/designate/tests/test_backend/__init__.py +++ b/designate/tests/test_backend/__init__.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.tests import TestCase from designate import backend diff --git a/designate/tests/test_quota/__init__.py b/designate/tests/test_quota/__init__.py index 780040de..b7a9a17c 100644 --- a/designate/tests/test_quota/__init__.py +++ b/designate/tests/test_quota/__init__.py @@ -13,7 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import log as logging from designate.tests import TestCase from designate import quota diff --git a/designate/tests/test_sink/__init__.py b/designate/tests/test_sink/__init__.py deleted file mode 100644 index 303c6295..00000000 --- a/designate/tests/test_sink/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright 2012 Managed I.T. -# -# Author: Kiall Mac Innes -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from designate.tests import TestCase - - -class SinkTestCase(TestCase): - __test__ = False diff --git a/designate/tests/test_sink/test_service.py b/designate/tests/test_sink/test_service.py deleted file mode 100644 index 1370a7fb..00000000 --- a/designate/tests/test_sink/test_service.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2012 Managed I.T. -# -# Author: Kiall Mac Innes -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from designate.openstack.common import log as logging -from designate.tests.test_sink import SinkTestCase - -LOG = logging.getLogger(__name__) - - -class SinkServiceTest(SinkTestCase): - __test__ = True - - def setUp(self): - super(SinkServiceTest, self).setUp() - - self.sink_service = self.get_sink_service() - - def test_start_and_stop(self): - # Ensures the start/stop actions don't raise - self.sink_service.start() - self.sink_service.stop() diff --git a/designate/utils.py b/designate/utils.py index e9dfd8d9..794461d9 100644 --- a/designate/utils.py +++ b/designate/utils.py @@ -18,7 +18,7 @@ import pkg_resources import json from jinja2 import Template from designate.openstack.common import log as logging -from designate.openstack.common import cfg +from oslo.config import cfg from designate.openstack.common import processutils from designate.openstack.common import timeutils from designate.openstack.common.notifier import api as notifier_api diff --git a/openstack-common.conf b/openstack-common.conf index cae4db3d..97dd9eae 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -21,10 +21,12 @@ module=processutils module=rootwrap module=rpc module=service +module=sslutils module=threadgroup module=timeutils module=uuidutils module=wsgi +module=xmlutils # The base module to hold the copy of openstack.common base=designate diff --git a/requirements.txt b/requirements.txt index a54d0ca5..05f88927 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ ipaddr iso8601>=0.1.4 jsonschema>=1.0.0,!=1.4.0,<2 kombu>2.4.7 +oslo.config>=1.1.0 paste pastedeploy>=1.5.0 pbr>=0.5.16,<0.6