Update the oslo code before importing a new module

Note:
- there are some new oslo dependencies pulled in.
- "six" has been added to the requirements.
- rpc messages now have a "namespace" that conflicted with
  the metric namespace.

part of blueprint heat-manage
Change-Id: I92c6b91a28ca0380d13f0a3ba1544b9d517d74a8
This commit is contained in:
Angus Salkeld 2013-05-27 16:05:13 +10:00
parent 067912b4bc
commit 743299b0d3
48 changed files with 2031 additions and 511 deletions

View File

@ -227,9 +227,10 @@ class WatchController(object):
try:
# Engine does not currently support query by namespace/metric
# so we pass None/None and do any filtering locally
null_kwargs = {'metric_namespace': None,
'metric_name': None}
watch_data = self.engine_rpcapi.show_watch_metric(con,
namespace=None,
metric_name=None)
**null_kwargs)
except rpc_common.RemoteError as ex:
return exception.map_remote_error(ex)

View File

@ -18,6 +18,7 @@ from oslo.config import cfg
from heat.openstack.common import local
from heat.common import exception
from heat.common import wsgi
from heat.openstack.common import context
from heat.openstack.common import importutils
from heat.openstack.common import uuidutils
from heat.db import api as db_api
@ -27,7 +28,7 @@ def generate_request_id():
return 'req-' + uuidutils.generate_uuid()
class RequestContext(object):
class RequestContext(context.RequestContext):
"""
Stores information about the security context under which the user
accesses the system, as well as additional request information.
@ -45,19 +46,20 @@ class RequestContext(object):
:param kwargs: Extra arguments that might be present, but we ignore
because they possibly came in from older rpc messages.
"""
super(RequestContext, self).__init__(auth_token=auth_token,
user=username, tenant=tenant,
is_admin=is_admin,
read_only=read_only,
show_deleted=show_deleted,
request_id='unused')
self.auth_token = auth_token
self.username = username
self.password = password
self.aws_creds = aws_creds
self.aws_auth_uri = aws_auth_uri
self.tenant = tenant
self.tenant_id = tenant_id
self.auth_url = auth_url
self.roles = roles or []
self.is_admin = is_admin
self.read_only = read_only
self._show_deleted = show_deleted
self.owner_is_tenant = owner_is_tenant
if overwrite or not hasattr(local.store, 'context'):
self.update_store()
@ -74,7 +76,7 @@ class RequestContext(object):
def to_dict(self):
return {'auth_token': self.auth_token,
'username': self.username,
'username': self.user,
'password': self.password,
'aws_creds': self.aws_creds,
'aws_auth_uri': self.aws_auth_uri,
@ -93,13 +95,6 @@ class RequestContext(object):
"""Return the owner to correlate with an image."""
return self.tenant if self.owner_is_tenant else self.user
@property
def show_deleted(self):
"""Admins can see deleted by default."""
if self._show_deleted or self.is_admin:
return True
return False
def get_admin_context(read_deleted="no"):
return RequestContext(is_admin=True)

View File

@ -577,7 +577,7 @@ class EngineService(service.Service):
return result
@request_context
def show_watch_metric(self, cnxt, namespace=None, metric_name=None):
def show_watch_metric(self, cnxt, metric_namespace=None, metric_name=None):
'''
The show_watch method returns the datapoints for a metric
arg1 -> RPC context.
@ -588,7 +588,7 @@ class EngineService(service.Service):
# DB API and schema does not yet allow us to easily query by
# namespace/metric, but we will want this at some point
# for now, the API can query all metric data and filter locally
if namespace is not None or metric_name is not None:
if metric_namespace is not None or metric_name is not None:
logger.error("Filtering by namespace/metric not yet supported")
return

View File

@ -0,0 +1,82 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Simple class that stores security context information in the web request.
Projects should subclass this class if they wish to enhance the request
context or provide additional information in their specific WSGI pipeline.
"""
import itertools
from heat.openstack.common import uuidutils
def generate_request_id():
return 'req-%s' % uuidutils.generate_uuid()
class RequestContext(object):
"""
Stores information about the security context under which the user
accesses the system, as well as additional request information.
"""
def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
read_only=False, show_deleted=False, request_id=None):
self.auth_token = auth_token
self.user = user
self.tenant = tenant
self.is_admin = is_admin
self.read_only = read_only
self.show_deleted = show_deleted
if not request_id:
request_id = generate_request_id()
self.request_id = request_id
def to_dict(self):
return {'user': self.user,
'tenant': self.tenant,
'is_admin': self.is_admin,
'read_only': self.read_only,
'show_deleted': self.show_deleted,
'auth_token': self.auth_token,
'request_id': self.request_id}
def get_admin_context(show_deleted="no"):
context = RequestContext(None,
tenant=None,
is_admin=True,
show_deleted=show_deleted)
return context
def get_context_from_function_and_args(function, args, kwargs):
"""Find an arg of type RequestContext and return it.
This is useful in a couple of decorators where we don't
know much about the function we're wrapping.
"""
for arg in itertools.chain(kwargs.values(), args):
if isinstance(arg, RequestContext):
return arg
return None

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Openstack, LLC.
# Copyright (c) 2012 OpenStack Foundation.
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import gc
import pprint
import sys
@ -37,7 +39,7 @@ CONF.register_opts(eventlet_backdoor_opts)
def _dont_use_this():
print "Don't use this, just disconnect instead"
print("Don't use this, just disconnect instead")
def _find_objects(t):
@ -46,9 +48,16 @@ def _find_objects(t):
def _print_greenthreads():
for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt
print(i, gt)
traceback.print_stack(gt.gr_frame)
print
print()
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
print(threadId)
traceback.print_stack(stack)
print()
def initialize_if_enabled():
@ -57,6 +66,7 @@ def initialize_if_enabled():
'quit': _dont_use_this, # So we don't exit the entire process
'fo': _find_objects,
'pgt': _print_greenthreads,
'pnt': _print_nativethreads,
}
if CONF.backdoor_port is None:

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -98,7 +98,7 @@ def wrap_exception(f):
def _wrap(*args, **kw):
try:
return f(*args, **kw)
except Exception, e:
except Exception as e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception(_('Uncaught exception'))

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -24,10 +24,27 @@ Usual usage in an openstack.common module:
"""
import gettext
import os
t = gettext.translation('openstack-common', 'locale', fallback=True)
_localedir = os.environ.get('heat'.upper() + '_LOCALEDIR')
_t = gettext.translation('heat', localedir=_localedir, fallback=True)
def _(msg):
return t.ugettext(msg)
return _t.ugettext(msg)
def install(domain):
"""Install a _() function using the given translation domain.
Given a translation domain, install a _() function using gettext's
install() function.
The main difference from gettext.install() is that we allow
overriding the default localedir (e.g. /usr/share/locale) using
a translation-domain-specific environment variable (e.g.
NOVA_LOCALEDIR).
"""
gettext.install(domain,
localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
unicode=True)

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -34,15 +34,29 @@ This module provides a few things:
import datetime
import functools
import inspect
import itertools
import json
import types
import xmlrpclib
import six
from heat.openstack.common import timeutils
def to_primitive(value, convert_instances=False, level=0):
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.isfunction, inspect.isgeneratorfunction,
inspect.isgenerator, inspect.istraceback, inspect.isframe,
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
inspect.isabstract]
_simple_types = (types.NoneType, int, basestring, bool, float, long)
def to_primitive(value, convert_instances=False, convert_datetime=True,
level=0, max_depth=3):
"""Convert a complex object into primitives.
Handy for JSON serialization. We can optionally handle instances,
@ -56,19 +70,32 @@ def to_primitive(value, convert_instances=False, level=0):
Therefore, convert_instances=True is lossy ... be aware.
"""
nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.isfunction, inspect.isgeneratorfunction,
inspect.isgenerator, inspect.istraceback, inspect.isframe,
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
inspect.isabstract]
for test in nasty:
if test(value):
return unicode(value)
# handle obvious types first - order of basic types determined by running
# full tests on nova project, resulting in the following counts:
# 572754 <type 'NoneType'>
# 460353 <type 'int'>
# 379632 <type 'unicode'>
# 274610 <type 'str'>
# 199918 <type 'dict'>
# 114200 <type 'datetime.datetime'>
# 51817 <type 'bool'>
# 26164 <type 'list'>
# 6491 <type 'float'>
# 283 <type 'tuple'>
# 19 <type 'long'>
if isinstance(value, _simple_types):
return value
# value of itertools.count doesn't get caught by inspects
# above and results in infinite loop when list(value) is called.
if isinstance(value, datetime.datetime):
if convert_datetime:
return timeutils.strtime(value)
else:
return value
# value of itertools.count doesn't get caught by nasty_type_tests
# and results in infinite loop when list(value) is called.
if type(value) == itertools.count:
return unicode(value)
return six.text_type(value)
# FIXME(vish): Workaround for LP bug 852095. Without this workaround,
# tests that raise an exception in a mocked method that
@ -78,52 +105,46 @@ def to_primitive(value, convert_instances=False, level=0):
if getattr(value, '__module__', None) == 'mox':
return 'mock'
if level > 3:
if level > max_depth:
return '?'
# The try block may not be necessary after the class check above,
# but just in case ...
try:
recursive = functools.partial(to_primitive,
convert_instances=convert_instances,
convert_datetime=convert_datetime,
level=level,
max_depth=max_depth)
if isinstance(value, dict):
return dict((k, recursive(v)) for k, v in value.iteritems())
elif isinstance(value, (list, tuple)):
return [recursive(lv) for lv in value]
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
if isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
if isinstance(value, (list, tuple)):
o = []
for v in value:
o.append(to_primitive(v, convert_instances=convert_instances,
level=level))
return o
elif isinstance(value, dict):
o = {}
for k, v in value.iteritems():
o[k] = to_primitive(v, convert_instances=convert_instances,
level=level)
return o
elif isinstance(value, datetime.datetime):
if convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif hasattr(value, 'iteritems'):
return to_primitive(dict(value.iteritems()),
convert_instances=convert_instances,
level=level + 1)
return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
return to_primitive(list(value),
convert_instances=convert_instances,
level=level)
return recursive(list(value))
elif convert_instances and hasattr(value, '__dict__'):
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
return to_primitive(value.__dict__,
convert_instances=convert_instances,
level=level + 1)
return recursive(value.__dict__, level=level + 1)
else:
if any(test(value) for test in _nasty_type_tests):
return six.text_type(value)
return value
except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
return unicode(value)
return six.text_type(value)
def dumps(value, default=to_primitive, **kwargs):

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -26,6 +26,9 @@ class WeakLocal(corolocal.local):
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
# the weak reference and return the inner value here.
rval = rval()
return rval
@ -34,4 +37,12 @@ class WeakLocal(corolocal.local):
return corolocal.local.__setattr__(self, attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
store = WeakLocal()
# A "weak" store uses weak references and allows an object to fall out of scope
# when it falls out of scope in the code that uses the thread local storage. A
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
strong_store = corolocal.local

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@ -29,6 +29,7 @@ It also allows setting of formatting information through conf.
"""
import ConfigParser
import cStringIO
import inspect
import itertools
@ -36,19 +37,17 @@ import logging
import logging.config
import logging.handlers
import os
import stat
import sys
import traceback
from oslo.config import cfg
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common import local
from heat.openstack.common import notifier
_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [
@ -73,11 +72,13 @@ logging_cli_opts = [
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-format',
default=_DEFAULT_LOG_FORMAT,
default=None,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'Default: %(default)s'),
'This option is deprecated. Please use '
'logging_context_format_string and '
'logging_default_format_string instead.'),
cfg.StrOpt('log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
@ -87,11 +88,11 @@ logging_cli_opts = [
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
'If not set, logging will go to stdout.'),
'If no default is set, logging will go to stdout.'),
cfg.StrOpt('log-dir',
deprecated_name='logdir',
help='(Optional) The directory to keep log files in '
'(will be prepended to --log-file)'),
help='(Optional) The base directory used for relative '
'--log-file paths'),
cfg.BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
@ -103,17 +104,14 @@ logging_cli_opts = [
generic_log_opts = [
cfg.BoolOpt('use_stderr',
default=True,
help='Log output to standard error'),
cfg.StrOpt('logfile_mode',
default='0644',
help='Default file mode used when creating log files'),
help='Log output to standard error')
]
log_opts = [
cfg.StrOpt('logging_context_format_string',
default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [%(request_id)s %(user)s %(tenant)s] '
'%(instance)s%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
@ -210,7 +208,27 @@ def _get_log_file_path(binary=None):
return '%s.log' % (os.path.join(logdir, binary),)
class ContextAdapter(logging.LoggerAdapter):
class BaseLoggerAdapter(logging.LoggerAdapter):
def audit(self, msg, *args, **kwargs):
self.log(logging.AUDIT, msg, *args, **kwargs)
class LazyAdapter(BaseLoggerAdapter):
def __init__(self, name='unknown', version='unknown'):
self._logger = None
self.extra = {}
self.name = name
self.version = version
@property
def logger(self):
if not self._logger:
self._logger = getLogger(self.name, self.version)
return self._logger
class ContextAdapter(BaseLoggerAdapter):
warn = logging.LoggerAdapter.warning
def __init__(self, logger, project_name, version_string):
@ -218,8 +236,9 @@ class ContextAdapter(logging.LoggerAdapter):
self.project = project_name
self.version = version_string
def audit(self, msg, *args, **kwargs):
self.log(logging.AUDIT, msg, *args, **kwargs)
@property
def handlers(self):
return self.logger.handlers
def deprecated(self, msg, *args, **kwargs):
stdmsg = _("Deprecated: %s") % msg
@ -303,17 +322,6 @@ class JSONFormatter(logging.Formatter):
return jsonutils.dumps(message)
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
if ('heat.openstack.common.notifier.log_notifier' in
CONF.notification_driver):
return
notifier.api.notify(None, 'error.publisher',
'error_notification',
notifier.api.ERROR,
dict(error=record.msg))
def _create_logging_excepthook(product_name):
def logging_excepthook(type, value, tb):
extra = {}
@ -323,18 +331,33 @@ def _create_logging_excepthook(product_name):
return logging_excepthook
class LogConfigError(Exception):
message = _('Error loading logging config %(log_config)s: %(err_msg)s')
def __init__(self, log_config, err_msg):
self.log_config = log_config
self.err_msg = err_msg
def __str__(self):
return self.message % dict(log_config=self.log_config,
err_msg=self.err_msg)
def _load_log_config(log_config):
try:
logging.config.fileConfig(log_config)
except ConfigParser.Error as exc:
raise LogConfigError(log_config, str(exc))
def setup(product_name):
"""Setup logging."""
sys.excepthook = _create_logging_excepthook(product_name)
if CONF.log_config:
try:
logging.config.fileConfig(CONF.log_config)
except Exception:
traceback.print_exc()
raise
_load_log_config(CONF.log_config)
else:
_setup_logging_from_conf(product_name)
_setup_logging_from_conf()
sys.excepthook = _create_logging_excepthook(product_name)
def set_defaults(logging_context_format_string):
@ -367,8 +390,8 @@ def _find_facility_from_conf():
return facility
def _setup_logging_from_conf(product_name):
log_root = getLogger(product_name).logger
def _setup_logging_from_conf():
log_root = getLogger(None).logger
for handler in log_root.handlers:
log_root.removeHandler(handler)
@ -383,11 +406,6 @@ def _setup_logging_from_conf(product_name):
filelog = logging.handlers.WatchedFileHandler(logpath)
log_root.addHandler(filelog)
mode = int(CONF.logfile_mode, 8)
st = os.stat(logpath)
if st.st_mode != (stat.S_IFREG | mode):
os.chmod(logpath, mode)
if CONF.use_stderr:
streamlog = ColorHandler()
log_root.addHandler(streamlog)
@ -399,14 +417,22 @@ def _setup_logging_from_conf(product_name):
log_root.addHandler(streamlog)
if CONF.publish_errors:
log_root.addHandler(PublishErrorsHandler(logging.ERROR))
handler = importutils.import_object(
"heat.openstack.common.log_handler.PublishErrorsHandler",
logging.ERROR)
log_root.addHandler(handler)
datefmt = CONF.log_date_format
for handler in log_root.handlers:
datefmt = CONF.log_date_format
# NOTE(alaski): CONF.log_format overrides everything currently. This
# should be deprecated in favor of context aware formatting.
if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
log_root.info('Deprecated: log_format is now deprecated and will '
'be removed in the next release')
else:
handler.setFormatter(ContextFormatter(datefmt=datefmt))
if CONF.debug:
log_root.setLevel(logging.DEBUG)
@ -415,14 +441,11 @@ def _setup_logging_from_conf(product_name):
else:
log_root.setLevel(logging.WARNING)
level = logging.NOTSET
for pair in CONF.default_log_levels:
mod, _sep, level_name = pair.partition('=')
level = logging.getLevelName(level_name)
logger = logging.getLogger(mod)
logger.setLevel(level)
for handler in log_root.handlers:
logger.addHandler(handler)
_loggers = {}
@ -435,6 +458,15 @@ def getLogger(name='unknown', version='unknown'):
return _loggers[name]
def getLazyLogger(name='unknown', version='unknown'):
"""
create a pass-through logger that does not create the real logger
until it is really needed and delegates all calls to the real logger
once it is created
"""
return LazyAdapter(name, version)
class WritableLogger(object):
"""A thin wrapper that responds to `write` and logs."""
@ -446,7 +478,7 @@ class WritableLogger(object):
self.logger.log(self.level, msg)
class LegacyFormatter(logging.Formatter):
class ContextFormatter(logging.Formatter):
"""A context.RequestContext aware formatter configured through flags.
The flags used to set format strings are: logging_context_format_string

View File

@ -84,7 +84,7 @@ class FixedIntervalLoopingCall(LoopingCallBase):
LOG.warn(_('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone, e:
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
@ -131,7 +131,7 @@ class DynamicLoopingCall(LoopingCallBase):
LOG.debug(_('Dynamic looping call sleeping for %.02f '
'seconds'), idle)
greenthread.sleep(idle)
except LoopingCallDone, e:
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -19,7 +19,8 @@
Network-related utilities and helper functions.
"""
import logging
from heat.openstack.common import log as logging
LOG = logging.getLogger(__name__)

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -30,7 +30,6 @@ LOG = logging.getLogger(__name__)
notifier_opts = [
cfg.MultiStrOpt('notification_driver',
default=[],
deprecated_name='list_notifier_drivers',
help='Driver or drivers to handle sending notifications'),
cfg.StrOpt('default_notification_level',
default='INFO',

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack, LLC.
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -57,14 +57,15 @@ as it allows particular rules to be explicitly disabled.
"""
import abc
import logging
import re
import urllib
import six
import urllib2
from heat.openstack.common.gettextutils import _
from heat.openstack.common import jsonutils
from heat.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@ -436,7 +437,7 @@ def _parse_list_rule(rule):
or_list.append(AndCheck(and_list))
# If we have only one check, omit the "or"
if len(or_list) == 0:
if not or_list:
return FalseCheck()
elif len(or_list) == 1:
return or_list[0]
@ -775,5 +776,5 @@ class GenericCheck(Check):
# TODO(termie): do dict inspection via dot syntax
match = self.match % target
if self.kind in creds:
return match == unicode(creds[self.kind])
return match == six.text_type(creds[self.kind])
return False

View File

@ -0,0 +1,247 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System-level utilities and helper functions.
"""
import os
import random
import shlex
import signal
from eventlet.green import subprocess
from eventlet import greenthread
from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class InvalidArgumentError(Exception):
def __init__(self, message=None):
super(InvalidArgumentError, self).__init__(message)
class UnknownArgumentError(Exception):
def __init__(self, message=None):
super(UnknownArgumentError, self).__init__(message)
class ProcessExecutionError(Exception):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
self.exit_code = exit_code
self.stderr = stderr
self.stdout = stdout
self.cmd = cmd
self.description = description
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
exit_code = '-'
message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r"
% (description, cmd, exit_code, stdout, stderr))
super(ProcessExecutionError, self).__init__(message)
class NoRootWrapSpecified(Exception):
def __init__(self, message=None):
super(NoRootWrapSpecified, self).__init__(message)
def _subprocess_setup():
# Python installs a SIGPIPE handler by default. This is usually not what
# non-Python subprocesses expect.
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
def execute(*cmd, **kwargs):
"""
Helper method to shell out and execute a command through subprocess with
optional retry.
:param cmd: Passed to subprocess.Popen.
:type cmd: string
:param process_input: Send to opened process.
:type proces_input: string
:param check_exit_code: Single bool, int, or list of allowed exit
codes. Defaults to [0]. Raise
:class:`ProcessExecutionError` unless
program exits with one of these code.
:type check_exit_code: boolean, int, or [int]
:param delay_on_retry: True | False. Defaults to True. If set to True,
wait a short amount of time before retrying.
:type delay_on_retry: boolean
:param attempts: How many times to retry cmd.
:type attempts: int
:param run_as_root: True | False. Defaults to False. If set to True,
the command is prefixed by the command specified
in the root_helper kwarg.
:type run_as_root: boolean
:param root_helper: command to prefix to commands called with
run_as_root=True
:type root_helper: string
:param shell: whether or not there should be a shell used to
execute this command. Defaults to false.
:type shell: boolean
:returns: (stdout, stderr) from process execution
:raises: :class:`UnknownArgumentError` on
receiving unknown arguments
:raises: :class:`ProcessExecutionError`
"""
process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', [0])
ignore_exit_code = False
delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
shell = kwargs.pop('shell', False)
if isinstance(check_exit_code, bool):
ignore_exit_code = not check_exit_code
check_exit_code = [0]
elif isinstance(check_exit_code, int):
check_exit_code = [check_exit_code]
if kwargs:
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root and os.geteuid() != 0:
if not root_helper:
raise NoRootWrapSpecified(
message=('Command requested root, but did not specify a root '
'helper.'))
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
while attempts > 0:
attempts -= 1
try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
if os.name == 'nt':
preexec_fn = None
close_fds = False
else:
preexec_fn = _subprocess_setup
close_fds = True
obj = subprocess.Popen(cmd,
stdin=_PIPE,
stdout=_PIPE,
stderr=_PIPE,
close_fds=close_fds,
preexec_fn=preexec_fn,
shell=shell)
result = None
if process_input is not None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.debug(_('Result was %s') % _returncode)
if not ignore_exit_code and _returncode not in check_exit_code:
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode,
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
return result
except ProcessExecutionError:
if not attempts:
raise
else:
LOG.debug(_('%r failed. Retrying.'), cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
finally:
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
def trycmd(*args, **kwargs):
"""
A wrapper around execute() to more easily handle warnings and errors.
Returns an (out, err) tuple of strings containing the output of
the command's stdout and stderr. If 'err' is not empty then the
command can be considered to have failed.
:discard_warnings True | False. Defaults to False. If set to True,
then for succeeding commands, stderr is cleared
"""
discard_warnings = kwargs.pop('discard_warnings', False)
try:
out, err = execute(*args, **kwargs)
failed = False
except ProcessExecutionError, exn:
out, err = '', str(exn)
failed = True
if not failed and discard_warnings and err:
# Handle commands that output to stderr but otherwise succeed
err = ''
return out, err
def ssh_execute(ssh, cmd, process_input=None,
addl_env=None, check_exit_code=True):
LOG.debug(_('Running cmd (SSH): %s'), cmd)
if addl_env:
raise InvalidArgumentError(_('Environment not supported over SSH'))
if process_input:
# This is (probably) fixable if we need it...
raise InvalidArgumentError(_('process_input not supported over SSH'))
stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
channel = stdout_stream.channel
# NOTE(justinsb): This seems suspicious...
# ...other SSH clients have buffering issues with this approach
stdout = stdout_stream.read()
stderr = stderr_stream.read()
stdin_stream.close()
exit_status = channel.recv_exit_status()
# exit_status == -1 if no exit code was returned
if exit_status != -1:
LOG.debug(_('Result was %s') % exit_status)
if check_exit_code and exit_status != 0:
raise ProcessExecutionError(exit_code=exit_status,
stdout=stdout,
stderr=stderr,
cmd=cmd)
return (stdout, stderr)

View File

@ -25,9 +25,17 @@ For some wrappers that add message versioning to rpc, see:
rpc.proxy
"""
import inspect
from oslo.config import cfg
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import local
from heat.openstack.common import log as logging
LOG = logging.getLogger(__name__)
rpc_opts = [
@ -63,7 +71,8 @@ rpc_opts = [
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
CONF = cfg.CONF
CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
@ -84,10 +93,27 @@ def create_connection(new=True):
:returns: An instance of openstack.common.rpc.common.Connection
"""
return _get_impl().create_connection(cfg.CONF, new=new)
return _get_impl().create_connection(CONF, new=new)
def call(context, topic, msg, timeout=None):
def _check_for_lock():
if not CONF.debug:
return None
if ((hasattr(local.strong_store, 'locks_held')
and local.strong_store.locks_held)):
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
LOG.warn(_('A RPC is being made while holding a lock. The locks '
'currently held are %(locks)s. This is probably a bug. '
'Please report it. Include the following: [%(stack)s].'),
{'locks': local.strong_store.locks_held,
'stack': stack})
return True
return False
def call(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
@ -101,13 +127,17 @@ def call(context, topic, msg, timeout=None):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
if check_for_lock:
_check_for_lock()
return _get_impl().call(CONF, context, topic, msg, timeout)
def cast(context, topic, msg):
@ -125,7 +155,7 @@ def cast(context, topic, msg):
:returns: None
"""
return _get_impl().cast(cfg.CONF, context, topic, msg)
return _get_impl().cast(CONF, context, topic, msg)
def fanout_cast(context, topic, msg):
@ -146,10 +176,10 @@ def fanout_cast(context, topic, msg):
:returns: None
"""
return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
return _get_impl().fanout_cast(CONF, context, topic, msg)
def multicall(context, topic, msg, timeout=None):
def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
@ -167,6 +197,8 @@ def multicall(context, topic, msg, timeout=None):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
@ -176,7 +208,9 @@ def multicall(context, topic, msg, timeout=None):
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
if check_for_lock:
_check_for_lock()
return _get_impl().multicall(CONF, context, topic, msg, timeout)
def notify(context, topic, msg, envelope=False):
@ -218,7 +252,7 @@ def cast_to_server(context, server_params, topic, msg):
:returns: None
"""
return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
return _get_impl().cast_to_server(CONF, context, server_params, topic,
msg)
@ -234,7 +268,7 @@ def fanout_cast_to_server(context, server_params, topic, msg):
:returns: None
"""
return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
return _get_impl().fanout_cast_to_server(CONF, context, server_params,
topic, msg)
@ -264,10 +298,10 @@ def _get_impl():
global _RPCIMPL
if _RPCIMPL is None:
try:
_RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
_RPCIMPL = importutils.import_module(CONF.rpc_backend)
except ImportError:
# For backwards compatibility with older nova config.
impl = cfg.CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc')
impl = CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl)
return _RPCIMPL

View File

@ -25,13 +25,19 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
AMQP, but is deprecated and predates this code.
"""
import collections
import inspect
import sys
import uuid
from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
# TODO(pekowsk): Remove import cfg and below comment in Havana.
# This import should no longer be needed when the amqp_rpc_single_reply_queue
# option is removed.
from oslo.config import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
@ -40,6 +46,17 @@ from heat.openstack.common import log as logging
from heat.openstack.common.rpc import common as rpc_common
# TODO(pekowski): Remove this option in Havana.
amqp_opts = [
cfg.BoolOpt('amqp_rpc_single_reply_queue',
default=False,
help='Enable a fast single reply queue if using AMQP based '
'RPC like RabbitMQ or Qpid.'),
]
cfg.CONF.register_opts(amqp_opts)
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@ -51,6 +68,7 @@ class Pool(pools.Pool):
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self):
@ -60,6 +78,16 @@ class Pool(pools.Pool):
def empty(self):
while self.free_items:
self.get().close()
# Force a new connection pool to be created.
# Note that this was added due to failing unit test cases. The issue
# is the above "while loop" gets all the cached connections from the
# pool and closes them, but never returns them to the pool, a pool
# leak. The unit tests hang waiting for an item to be returned to the
# pool. The unit tests get here via the teatDown() method. In the run
# time code, it gets here via cleanup() and only appears in service.py
# just before doing a sys.exit(), so cleanup() only happens once and
# the leakage is not a problem.
self.connection_cls.pool = None
_pool_create_sem = semaphore.Semaphore()
@ -137,6 +165,12 @@ class ConnectionContext(rpc_common.Connection):
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
self.connection.join_consumer_pool(callback,
pool_name,
topic,
exchange_name)
def consume_in_thread(self):
self.connection.consume_in_thread()
@ -148,8 +182,46 @@ class ConnectionContext(rpc_common.Connection):
raise rpc_common.InvalidRPCConnectionReuse()
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
ending=False, log_failure=True):
class ReplyProxy(ConnectionContext):
""" Connection class for RPC replies / callbacks """
def __init__(self, conf, connection_pool):
self._call_waiters = {}
self._num_call_waiters = 0
self._num_call_waiters_wrn_threshhold = 10
self._reply_q = 'reply_' + uuid.uuid4().hex
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
self.declare_direct_consumer(self._reply_q, self._process_data)
self.consume_in_thread()
def _process_data(self, message_data):
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
', message : %(data)s'), {'msg_id': msg_id,
'data': message_data})
else:
waiter.put(message_data)
def add_call_waiter(self, waiter, msg_id):
self._num_call_waiters += 1
if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
LOG.warn(_('Number of call waiters is greater than warning '
'threshhold: %d. There could be a MulticallProxyWaiter '
'leak.') % self._num_call_waiters_wrn_threshhold)
self._num_call_waiters_wrn_threshhold *= 2
self._call_waiters[msg_id] = waiter
def del_call_waiter(self, msg_id):
self._num_call_waiters -= 1
del self._call_waiters[msg_id]
def get_reply_q(self):
return self._reply_q
def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
failure=None, ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@ -168,13 +240,22 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
_add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty.
if reply_q:
msg['_msg_id'] = msg_id
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
else:
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
self.reply_q = kwargs.pop('reply_q', None)
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
@ -182,13 +263,14 @@ class RpcContext(rpc_common.CommonRpcContext):
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
values['reply_q'] = self.reply_q
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None, log_failure=True):
if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
ending, log_failure)
msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
reply, failure, ending, log_failure)
if ending:
self.msg_id = None
@ -204,6 +286,7 @@ def unpack_context(conf, msg):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
@ -224,15 +307,86 @@ def pack_context(msg, context):
msg.update(context_d)
class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args."""
class _MsgIdCache(object):
"""This class checks any duplicate messages."""
def __init__(self, conf, proxy, connection_pool):
self.proxy = proxy
# NOTE: This value is considered can be a configuration item, but
# it is not necessary to change its value in most cases,
# so let this value as static for now.
DUP_MSG_CHECK_SIZE = 16
def __init__(self, **kwargs):
self.prev_msgids = collections.deque([],
maxlen=self.DUP_MSG_CHECK_SIZE)
def check_duplicate_message(self, message_data):
"""AMQP consumers may read same message twice when exceptions occur
before ack is returned. This method prevents doing it.
"""
if UNIQUE_ID in message_data:
msg_id = message_data[UNIQUE_ID]
if msg_id not in self.prev_msgids:
self.prev_msgids.append(msg_id)
else:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
def _add_unique_id(msg):
"""Add unique_id for checking duplicate messages."""
unique_id = uuid.uuid4().hex
msg.update({UNIQUE_ID: unique_id})
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
class _ThreadPoolWithWait(object):
"""Base class for a delayed invocation manager used by
the Connection class to start up green threads
to handle incoming messages.
"""
def __init__(self, conf, connection_pool):
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
self.conf = conf
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class CallbackWrapper(_ThreadPoolWithWait):
"""Wraps a straight callback to allow it to be invoked in a green
thread.
"""
def __init__(self, conf, callback, connection_pool):
"""
:param conf: cfg.CONF instance
:param callback: a callable (probably a function)
:param connection_pool: connection pool as returned by
get_connection_pool()
"""
super(CallbackWrapper, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.callback = callback
def __call__(self, message_data):
self.pool.spawn_n(self.callback, message_data)
class ProxyCallback(_ThreadPoolWithWait):
"""Calls methods on a proxy object based on method and args."""
def __init__(self, conf, proxy, connection_pool):
super(ProxyCallback, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.proxy = proxy
self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@ -251,18 +405,21 @@ class ProxyCallback(object):
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
version = message_data.get('version', None)
version = message_data.get('version')
namespace = message_data.get('namespace')
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
self.pool.spawn_n(self._process_data, ctxt, version, method, args)
self.pool.spawn_n(self._process_data, ctxt, version, method,
namespace, args)
def _process_data(self, ctxt, version, method, args):
def _process_data(self, ctxt, version, method, namespace, args):
"""Process a message in a new thread.
If the proxy object we have has a dispatch method
@ -273,7 +430,8 @@ class ProxyCallback(object):
"""
ctxt.update_store()
try:
rval = self.proxy.dispatch(ctxt, version, method, **args)
rval = self.proxy.dispatch(ctxt, version, method, namespace,
**args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
@ -289,15 +447,73 @@ class ProxyCallback(object):
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error(_('Exception during message handling'),
exc_info=exc_info)
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
class MulticallProxyWaiter(object):
def __init__(self, conf, msg_id, timeout, connection_pool):
self._msg_id = msg_id
self._timeout = timeout or conf.rpc_response_timeout
self._reply_proxy = connection_pool.reply_proxy
self._done = False
self._got_ending = False
self._conf = conf
self._dataqueue = queue.LightQueue()
# Add this caller to the reply proxy's call_waiters
self._reply_proxy.add_call_waiter(self, self._msg_id)
self.msg_id_cache = _MsgIdCache()
def put(self, data):
self._dataqueue.put(data)
def done(self):
if self._done:
return
self._done = True
# Remove this caller from reply proxy's call_waiters
self._reply_proxy.del_call_waiter(self._msg_id)
def _process_data(self, data):
result = None
self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
result = rpc_common.deserialize_remote_exception(self._conf,
failure)
elif data.get('ending', False):
self._got_ending = True
else:
result = data['result']
return result
def __iter__(self):
"""Return a result until we get a reply with an 'ending" flag"""
if self._done:
raise StopIteration
while True:
try:
data = self._dataqueue.get(timeout=self._timeout)
result = self._process_data(data)
except queue.Empty:
self.done()
raise rpc_common.Timeout()
except Exception:
with excutils.save_and_reraise_exception():
self.done()
if self._got_ending:
self.done()
raise StopIteration
if isinstance(result, Exception):
self.done()
raise result
yield result
#TODO(pekowski): Remove MulticallWaiter() in Havana.
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
@ -307,6 +523,7 @@ class MulticallWaiter(object):
self._done = False
self._got_ending = False
self._conf = conf
self.msg_id_cache = _MsgIdCache()
def done(self):
if self._done:
@ -318,6 +535,7 @@ class MulticallWaiter(object):
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
@ -353,22 +571,41 @@ def create_connection(conf, new, connection_pool):
return ConnectionContext(conf, connection_pool, pooled=not new)
_reply_proxy_create_sem = semaphore.Semaphore()
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
# TODO(pekowski): Remove all these comments in Havana.
# For amqp_rpc_single_reply_queue = False,
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
# For amqp_rpc_single_reply_queue = True,
# The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_add_unique_id(msg)
pack_context(msg, context)
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
# TODO(pekowski): Remove this flag and the code under the if clause
# in Havana.
if not conf.amqp_rpc_single_reply_queue:
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
else:
with _reply_proxy_create_sem:
if not connection_pool.reply_proxy:
connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
@ -385,6 +622,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg))
@ -393,6 +631,7 @@ def cast(conf, context, topic, msg, connection_pool):
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
@ -400,6 +639,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@ -409,6 +649,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@ -420,10 +661,11 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
msg = rpc_common.serialize_msg(msg, force_envelope=True)
msg = rpc_common.serialize_msg(msg)
conn.notify_send(topic, msg)

View File

@ -22,6 +22,7 @@ import sys
import traceback
from oslo.config import cfg
import six
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
@ -49,8 +50,8 @@ deserialize_msg().
The current message format (version 2.0) is very simple. It is:
{
'heat.version': <RPC Envelope Version as a String>,
'heat.message': <Application Message Payload, JSON encoded>
'oslo.version': <RPC Envelope Version as a String>,
'oslo.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
@ -66,12 +67,8 @@ to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'heat.version'
_MESSAGE_KEY = 'heat.message'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
class RPCException(Exception):
@ -122,7 +119,29 @@ class Timeout(RPCException):
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
message = _("Timeout while waiting on RPC response.")
message = _('Timeout while waiting on RPC response - '
'topic: "%(topic)s", RPC method: "%(method)s" '
'info: "%(info)s"')
def __init__(self, info=None, topic=None, method=None):
"""
:param info: Extra info to convey to the user
:param topic: The topic that the rpc call was sent to
:param rpc_method_name: The name of the rpc method being
called
"""
self.info = info
self.topic = topic
self.method = method
super(Timeout, self).__init__(
None,
info=info or _('<unknown>'),
topic=topic or _('<unknown>'),
method=method or _('<unknown>'))
class DuplicateMessageError(RPCException):
message = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
@ -139,6 +158,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException):
"not supported by this endpoint.")
class RpcVersionCapError(RPCException):
message = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object):
"""A connection, returned by rpc.create_connection().
@ -197,6 +220,28 @@ class Connection(object):
"""
raise NotImplementedError()
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
one is created.
:param callback: Callable to be invoked for each message.
:type callback: callable accepting one argument
:param pool_name: The name of the consumer pool.
:type pool_name: str
:param topic: The routing topic for desired messages.
:type topic: str
:param exchange_name: The name of the message exchange where
the client should attach. Defaults to
the configured exchange.
:type exchange_name: str
"""
raise NotImplementedError()
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
@ -236,7 +281,7 @@ def _safe_log(log_func, msg, msg_data):
for elem in arg[:-1]:
d = d[elem]
d[arg[-1]] = '<SANITIZED>'
except KeyError, e:
except KeyError as e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
@ -259,7 +304,8 @@ def serialize_remote_exception(failure_info, log_failure=True):
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
if log_failure:
LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(_("Returning exception %s to caller"),
six.text_type(failure))
LOG.error(tb)
kwargs = {}
@ -269,7 +315,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
data = {
'class': str(failure.__class__.__name__),
'module': str(failure.__class__.__module__),
'message': unicode(failure),
'message': six.text_type(failure),
'tb': tb,
'args': failure.args,
'kwargs': kwargs
@ -299,7 +345,7 @@ def deserialize_remote_exception(conf, data):
if not issubclass(klass, Exception):
raise TypeError("Can only deserialize Exceptions")
failure = klass(**failure.get('kwargs', {}))
failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
except (AttributeError, TypeError, ImportError):
return RemoteError(name, failure.get('message'), trace)
@ -379,7 +425,7 @@ class ClientException(Exception):
def catch_client_exception(exceptions, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception, e:
except Exception as e:
if type(e) in exceptions:
raise ClientException()
else:
@ -415,10 +461,7 @@ def version_is_compatible(imp_version, version):
return True
def serialize_msg(raw_msg, force_envelope=False):
if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
def serialize_msg(raw_msg):
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,

View File

@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified.
"""
from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import serializer as rpc_serializer
class RpcDispatcher(object):
@ -93,23 +94,48 @@ class RpcDispatcher(object):
contains a list of underlying managers that have an API_VERSION attribute.
"""
def __init__(self, callbacks):
def __init__(self, callbacks, serializer=None):
"""Initialize the rpc dispatcher.
:param callbacks: List of proxy objects that are an instance
of a class with rpc methods exposed. Each proxy
object should have an RPC_API_VERSION attribute.
:param serializer: The Serializer object that will be used to
deserialize arguments before the method call and
to serialize the result after it returns.
"""
self.callbacks = callbacks
if serializer is None:
serializer = rpc_serializer.NoOpSerializer()
self.serializer = serializer
super(RpcDispatcher, self).__init__()
def dispatch(self, ctxt, version, method, **kwargs):
def _deserialize_args(self, context, kwargs):
"""Helper method called to deserialize args before dispatch.
This calls our serializer on each argument, returning a new set of
args that have been deserialized.
:param context: The request context
:param kwargs: The arguments to be deserialized
:returns: A new set of deserialized args
"""
new_kwargs = dict()
for argname, arg in kwargs.iteritems():
new_kwargs[argname] = self.serializer.deserialize_entity(context,
arg)
return new_kwargs
def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
:param ctxt: The request context
:param version: The requested API version from the incoming message
:param method: The method requested to be called by the incoming
message.
:param namespace: The namespace for the requested method. If None,
the dispatcher will look for a method on a callback
object with no namespace set.
:param kwargs: A dict of keyword arguments to be passed to the method.
:returns: Whatever is returned by the underlying method that gets
@ -120,17 +146,31 @@ class RpcDispatcher(object):
had_compatible = False
for proxyobj in self.callbacks:
if hasattr(proxyobj, 'RPC_API_VERSION'):
# Check for namespace compatibility
try:
cb_namespace = proxyobj.RPC_API_NAMESPACE
except AttributeError:
cb_namespace = None
if namespace != cb_namespace:
continue
# Check for version compatibility
try:
rpc_api_version = proxyobj.RPC_API_VERSION
else:
except AttributeError:
rpc_api_version = '1.0'
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
if is_compatible:
return getattr(proxyobj, method)(ctxt, **kwargs)
kwargs = self._deserialize_args(ctxt, kwargs)
result = getattr(proxyobj, method)(ctxt, **kwargs)
return self.serializer.serialize_entity(ctxt, result)
if had_compatible:
raise AttributeError("No such RPC function '%s'" % method)

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -57,13 +57,14 @@ class Consumer(object):
self.topic = topic
self.proxy = proxy
def call(self, context, version, method, args, timeout):
def call(self, context, version, method, namespace, args, timeout):
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
rval = self.proxy.dispatch(context, version, method, **args)
rval = self.proxy.dispatch(context, version, method,
namespace, **args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None):
return
args = msg.get('args', {})
version = msg.get('version', None)
namespace = msg.get('namespace', None)
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
return consumer.call(context, version, method, args, timeout)
return consumer.call(context, version, method, namespace, args,
timeout)
def call(conf, context, topic, msg, timeout=None):
@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg):
return
args = msg.get('args', {})
version = msg.get('version', None)
namespace = msg.get('namespace', None)
for consumer in CONSUMERS.get(topic, []):
try:
consumer.call(context, version, method, args, None)
consumer.call(context, version, method, namespace, args, None)
except Exception:
pass

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -66,7 +66,8 @@ kombu_opts = [
help='the RabbitMQ userid'),
cfg.StrOpt('rabbit_password',
default='guest',
help='the RabbitMQ password'),
help='the RabbitMQ password',
secret=True),
cfg.StrOpt('rabbit_virtual_host',
default='/',
help='the RabbitMQ virtual host'),
@ -164,9 +165,10 @@ class ConsumerBase(object):
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
self.queue.consume(*args, callback=_callback, **options)
@ -174,7 +176,7 @@ class ConsumerBase(object):
"""Cancel the consuming from the queue, if it has started"""
try:
self.queue.cancel(self.tag)
except KeyError, e:
except KeyError as e:
# NOTE(comstud): Kludge to get around a amqplib bug
if str(e) != "u'%s'" % self.tag:
raise
@ -196,6 +198,7 @@ class DirectConsumer(ConsumerBase):
"""
# Default options
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': False}
options.update(kwargs)
@ -517,7 +520,7 @@ class Connection(object):
return
except (IOError, self.connection_errors) as e:
pass
except Exception, e:
except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
@ -558,10 +561,10 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
except (self.connection_errors, socket.timeout, IOError), e:
except (self.connection_errors, socket.timeout, IOError) as e:
if error_callback:
error_callback(e)
except Exception, e:
except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
@ -621,8 +624,8 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
LOG.debug(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
@ -749,6 +752,30 @@ class Connection(object):
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
one is created.
"""
callback_wrapper = rpc_amqp.CallbackWrapper(
conf=self.conf,
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
)
self.proxy_callbacks.append(callback_wrapper)
self.declare_topic_consumer(
queue_name=pool_name,
topic=topic,
exchange_name=exchange_name,
callback=callback_wrapper,
)
def create_connection(conf, new=True):
"""Create a connection"""

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
# Copyright 2011 - 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -40,8 +40,8 @@ qpid_opts = [
cfg.StrOpt('qpid_hostname',
default='localhost',
help='Qpid broker hostname'),
cfg.StrOpt('qpid_port',
default='5672',
cfg.IntOpt('qpid_port',
default=5672,
help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
@ -51,7 +51,8 @@ qpid_opts = [
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
help='Password for qpid connection'),
help='Password for qpid connection',
secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
@ -319,7 +320,7 @@ class Connection(object):
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.transport = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
def _register_consumer(self, consumer):
@ -330,22 +331,23 @@ class Connection(object):
def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues"""
if self.connection.opened():
try:
self.connection.close()
except qpid_exceptions.ConnectionError:
pass
attempt = 0
delay = 1
while True:
# Close the session if necessary
if self.connection.opened():
try:
self.connection.close()
except qpid_exceptions.ConnectionError:
pass
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
try:
self.connection_create(broker)
self.connection.open()
except qpid_exceptions.ConnectionError, e:
except qpid_exceptions.ConnectionError as e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
@ -373,7 +375,7 @@ class Connection(object):
try:
return method(*args, **kwargs)
except (qpid_exceptions.Empty,
qpid_exceptions.ConnectionError), e:
qpid_exceptions.ConnectionError) as e:
if error_callback:
error_callback(e)
self.reconnect()
@ -414,8 +416,8 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
LOG.debug(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
@ -559,6 +561,34 @@ class Connection(object):
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
one is created.
"""
callback_wrapper = rpc_amqp.CallbackWrapper(
conf=self.conf,
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
)
self.proxy_callbacks.append(callback_wrapper)
consumer = TopicConsumer(conf=self.conf,
session=self.session,
topic=topic,
callback=callback_wrapper,
name=pool_name,
exchange_name=exchange_name)
self._register_consumer(consumer)
return consumer
def create_connection(conf, new=True):
"""Create a connection"""

View File

@ -16,6 +16,7 @@
import os
import pprint
import re
import socket
import sys
import types
@ -25,6 +26,7 @@ import eventlet
import greenlet
from oslo.config import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
@ -89,10 +91,10 @@ def _serialize(data):
Error if a developer passes us bad data.
"""
try:
return str(jsonutils.dumps(data, ensure_ascii=True))
return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
LOG.error(_("JSON serialization failed."))
raise
with excutils.save_and_reraise_exception():
LOG.error(_("JSON serialization failed."))
def _deserialize(data):
@ -178,7 +180,7 @@ class ZmqSocket(object):
return
# We must unsubscribe, or we'll leak descriptors.
if len(self.subscriptions) > 0:
if self.subscriptions:
for f in self.subscriptions:
try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
@ -216,11 +218,18 @@ class ZmqClient(object):
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
if not envelope:
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
return
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send(map(bytes,
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
@ -267,12 +276,13 @@ class InternalContext(object):
try:
result = proxy.dispatch(
ctx, data['version'], data['method'], **data['args'])
ctx, data['version'], data['method'],
data.get('namespace'), **data['args'])
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
except rpc_common.ClientException, e:
except rpc_common.ClientException as e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
@ -286,21 +296,26 @@ class InternalContext(object):
def reply(self, ctx, proxy,
msg_id=None, context=None, topic=None, msg=None):
"""Reply to a casted call."""
# Our real method is curried into msg['args']
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
# this may be able to be removed earlier than
# 'I' if ConsumerBase.process were refactored.
if type(msg) is list:
payload = msg[-1]
else:
payload = msg
child_ctx = RpcContext.unmarshal(msg[0])
response = ConsumerBase.normalize_reply(
self._get_response(child_ctx, proxy, topic, msg[1]),
self._get_response(ctx, proxy, topic, payload),
ctx.replies)
LOG.debug(_("Sending reply"))
cast(CONF, ctx, topic, {
_multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
'msg_id': msg_id,
'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
})
}, _msg_id=msg_id)
class ConsumerBase(object):
@ -319,7 +334,7 @@ class ConsumerBase(object):
else:
return [result]
def process(self, style, target, proxy, ctx, data):
def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
@ -337,7 +352,7 @@ class ConsumerBase(object):
return
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
data['method'], data.get('namespace'), **data['args'])
class ZmqBaseReactor(ConsumerBase):
@ -423,6 +438,8 @@ class ZmqProxy(ZmqBaseReactor):
def __init__(self, conf):
super(ZmqProxy, self).__init__(conf)
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
self.topic_proxy = {}
@ -431,21 +448,15 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
msg_id, topic, style, in_msg = data
topic = topic.split('.', 1)[0]
topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
topic = topic.split('.', 1)[0]
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
@ -454,6 +465,13 @@ class ZmqProxy(ZmqBaseReactor):
LOG.info(_("Creating proxy for topic: %s"), topic)
try:
# The topic is received over the network,
# don't trust this input.
if self.badchars.search(topic) is not None:
emsg = _("Topic contained dangerous characters.")
LOG.warn(emsg)
raise RPCException(emsg)
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
@ -510,9 +528,9 @@ class ZmqProxy(ZmqBaseReactor):
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
raise
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
try:
self.register(consumption_proxy,
@ -520,13 +538,28 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
raise
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
super(ZmqProxy, self).consume_in_thread()
def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
try:
while True:
k = i.next()
h[k] = i.next()
except StopIteration:
return h
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@ -547,38 +580,53 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
msg_id, topic, style, in_msg = data
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
self.pool.spawn_n(self.process, style, topic,
proxy, ctx, request)
if data[2] == 'cast': # Legacy protocol
packenv = data[3]
ctx, msg = _deserialize(packenv)
request = rpc_common.deserialize_msg(msg)
ctx = RpcContext.unmarshal(ctx)
elif data[2] == 'impl_zmq_v2':
packenv = data[4:]
msg = unflatten_envelope(packenv)
request = rpc_common.deserialize_msg(msg)
# Unmarshal only after verifying the message.
ctx = RpcContext.unmarshal(data[3])
else:
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
return
self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
# Register with matchmaker.
_get_matchmaker().register(topic, CONF.rpc_zmq_host)
# Subscription scenarios
if fanout:
subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
subscribe = ('', fanout)[type(fanout) == str]
topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered."))
return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
@ -589,19 +637,26 @@ class Connection(rpc_common.Connection):
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
self.topics.append(topic)
def close(self):
_get_matchmaker().stop_heartbeat()
for topic in self.topics:
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
self.reactor.close()
self.topics = []
def wait(self):
self.reactor.wait()
def consume_in_thread(self):
_get_matchmaker().start_heartbeat()
self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
force_envelope=False):
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
_msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@ -610,7 +665,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(msg_id, topic, payload, serialize, force_envelope)
conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@ -618,8 +673,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None,
serialize=True, force_envelope=False):
def _call(addr, context, topic, msg, timeout=None,
envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@ -636,8 +691,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
'method': '-reply',
'args': {
'msg_id': msg_id,
'context': mcontext,
'topic': reply_topic,
# TODO(ewindisch): safe to remove mcontext in I.
'msg': [mcontext, msg]
}
}
@ -649,23 +704,36 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
"ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
"ipc://%s/zmq_topic_zmq_replies.%s" %
(CONF.rpc_zmq_ipc_dir,
CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload,
serialize=serialize, force_envelope=force_envelope)
_cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1])
if msg[2] == 'cast': # Legacy version
raw_msg = _deserialize(msg[-1])[-1]
elif msg[2] == 'impl_zmq_v2':
rpc_envelope = unflatten_envelope(msg[4:])
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
else:
raise rpc_common.UnsupportedRpcEnvelopeVersion(
_("Unsupported or unknown ZMQ envelope returned."))
responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
except (IndexError, KeyError):
raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
@ -681,8 +749,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False):
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@ -695,11 +763,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
if len(queues) == 0:
if not queues:
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
raise rpc_common.Timeout, "No match from matchmaker."
raise rpc_common.Timeout(_("No match from matchmaker."))
# This supports brokerless fanout (addresses > 1)
for queue in queues:
@ -708,11 +776,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout, serialize,
force_envelope)
_topic, msg, timeout, envelope,
_msg_id)
return
return method(_addr, context, _topic, _topic, msg, timeout,
serialize, force_envelope)
return method(_addr, context, _topic, msg, timeout,
envelope)
def create_connection(conf, new=True):
@ -742,7 +810,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
def notify(conf, context, topic, msg, **kwargs):
def notify(conf, context, topic, msg, envelope):
"""
Send notification event.
Notifications are sent to topic-priority.
@ -750,10 +818,8 @@ def notify(conf, context, topic, msg, **kwargs):
"""
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)
topic = topic.replace('.', '-')
cast(conf, context, topic, msg, envelope=envelope)
def cleanup():
@ -777,8 +843,14 @@ def _get_ctxt():
return ZMQ_CTX
def _get_matchmaker():
def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
matchmaker = importutils.import_object(CONF.rpc_zmq_matchmaker)
mm = CONF.rpc_zmq_matchmaker
if mm.endswith('matchmaker.MatchMakerRing'):
mm.replace('matchmaker', 'matchmaker_ring')
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
' %(new)s instead') % dict(
orig=CONF.rpc_zmq_matchmaker, new=mm))
matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker

View File

@ -19,9 +19,8 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import contextlib
import itertools
import json
import eventlet
from oslo.config import cfg
from heat.openstack.common.gettextutils import _
@ -29,10 +28,12 @@ from heat.openstack.common import log as logging
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq',
default=300,
help='Heartbeat frequency'),
cfg.IntOpt('matchmaker_heartbeat_ttl',
default=600,
help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
@ -70,12 +71,73 @@ class Binding(object):
class MatchMakerBase(object):
"""Match Maker Base Class."""
"""
Match Maker Base Class.
Build off HeartbeatMatchMakerBase if building a
heartbeat-capable MatchMaker.
"""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
self.no_heartbeat_msg = _('Matchmaker does not implement '
'registration or heartbeat.')
def register(self, key, host):
"""
Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
pass
def ack_alive(self, key, host):
"""
Acknowledge that a key.host is alive.
Used internally for updating heartbeats,
but may also be used publically to acknowledge
a system is alive (i.e. rpc message successfully
sent to host)
"""
pass
def is_alive(self, topic, host):
"""
Checks if a host is alive.
"""
pass
def expire(self, topic, host):
"""
Explicitly expire a host's registration.
"""
pass
def send_heartbeats(self):
"""
Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
pass
def unregister(self, key, host):
"""
Unregister a topic.
"""
pass
def start_heartbeat(self):
"""
Spawn heartbeat greenthread.
"""
pass
def stop_heartbeat(self):
"""
Destroys the heartbeat greenthread.
"""
pass
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
@ -99,6 +161,104 @@ class MatchMakerBase(object):
return workers
class HeartbeatMatchMakerBase(MatchMakerBase):
"""
Base for a heart-beat capable MatchMaker.
Provides common methods for registering,
unregistering, and maintaining heartbeats.
"""
def __init__(self):
self.hosts = set()
self._heart = None
self.host_topic = {}
super(HeartbeatMatchMakerBase, self).__init__()
def send_heartbeats(self):
"""
Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
for key, host in self.host_topic:
self.ack_alive(key, host)
def ack_alive(self, key, host):
"""
Acknowledge that a host.topic is alive.
Used internally for updating heartbeats,
but may also be used publically to acknowledge
a system is alive (i.e. rpc message successfully
sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
def backend_register(self, key, host):
"""
Implements registration logic.
Called by register(self,key,host)
"""
raise NotImplementedError("Must implement backend_register")
def backend_unregister(self, key, key_host):
"""
Implements de-registration logic.
Called by unregister(self,key,host)
"""
raise NotImplementedError("Must implement backend_unregister")
def register(self, key, host):
"""
Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
self.hosts.add(host)
self.host_topic[(key, host)] = host
key_host = '.'.join((key, host))
self.backend_register(key, key_host)
self.ack_alive(key, host)
def unregister(self, key, host):
"""
Unregister a topic.
"""
if (key, host) in self.host_topic:
del self.host_topic[(key, host)]
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
{'key': key, 'host': host})
def start_heartbeat(self):
"""
Implementation of MatchMakerBase.start_heartbeat
Launches greenthread looping send_heartbeats(),
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
if not self.hosts:
raise MatchMakerException(
_("Register before starting heartbeat."))
def do_heartbeat():
while True:
self.send_heartbeats()
eventlet.sleep(CONF.matchmaker_heartbeat_freq)
self._heart = eventlet.spawn(do_heartbeat)
def stop_heartbeat(self):
"""
Destroys the heartbeat greenthread.
"""
if self._heart:
self._heart.kill()
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character
@ -139,98 +299,27 @@ class StubExchange(Exchange):
return [(key, None)]
class RingExchange(Exchange):
"""
Match Maker where hosts are loaded from a static file containing
a hashmap (JSON formatted).
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self):
def __init__(self, host='localhost'):
self.host = host
super(Exchange, self).__init__()
def run(self, key):
return [(key.split('.')[0] + '.localhost', 'localhost')]
return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange):
"""
Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
b, e = key.split('.', 1)
return [(b, e)]
class MatchMakerRing(MatchMakerBase):
"""
Match Maker where hosts are loaded from a static hashmap.
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
e = key.split('.', 1)[1]
return [(key, e)]
class MatchMakerLocalhost(MatchMakerBase):
@ -238,11 +327,11 @@ class MatchMakerLocalhost(MatchMakerBase):
Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
def __init__(self):
def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase):

View File

@ -0,0 +1,149 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
from oslo.config import cfg
from heat.openstack.common import importutils
from heat.openstack.common import log as logging
from heat.openstack.common.rpc import matchmaker as mm_common
redis = importutils.try_import('redis')
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
default=None,
help='Password for Redis server. (optional)'),
]
CONF = cfg.CONF
opt_group = cfg.OptGroup(name='matchmaker_redis',
title='Options for Redis-based MatchMaker')
CONF.register_group(opt_group)
CONF.register_opts(matchmaker_redis_opts, opt_group)
LOG = logging.getLogger(__name__)
class RedisExchange(mm_common.Exchange):
def __init__(self, matchmaker):
self.matchmaker = matchmaker
self.redis = matchmaker.redis
super(RedisExchange, self).__init__()
class RedisTopicExchange(RedisExchange):
"""
Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
while True:
member_name = self.redis.srandmember(topic)
if not member_name:
# If this happens, there are no
# longer any members.
break
if not self.matchmaker.is_alive(topic, member_name):
continue
host = member_name.split('.', 1)[1]
return [(member_name, host)]
return []
class RedisFanoutExchange(RedisExchange):
"""
Return a list of all hosts.
"""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
good_hosts = filter(
lambda host: self.matchmaker.is_alive(topic, host), hosts)
return [(x, x.split('.', 1)[1]) for x in good_hosts]
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
"""
MatchMaker registering and looking-up hosts with a Redis server.
"""
def __init__(self):
super(MatchMakerRedis, self).__init__()
if not redis:
raise ImportError("Failed to import module redis.")
self.redis = redis.StrictRedis(
host=CONF.matchmaker_redis.host,
port=CONF.matchmaker_redis.port,
password=CONF.matchmaker_redis.password)
self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
def ack_alive(self, key, host):
topic = "%s.%s" % (key, host)
if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
# If we could not update the expiration, the key
# might have been pruned. Re-register, creating a new
# key in Redis.
self.register(self.topic_host[host], host)
def is_alive(self, topic, host):
if self.redis.ttl(host) == -1:
self.expire(topic, host)
return False
return True
def expire(self, topic, host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.delete(host)
pipe.srem(topic, host)
pipe.execute()
def backend_register(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.sadd(key, key_host)
# No value is needed, we just
# care if it exists. Sets aren't viable
# because only keys can expire.
pipe.set(key_host, '')
pipe.execute()
def backend_unregister(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.srem(key, key_host)
pipe.delete(key_host)
pipe.execute()

View File

@ -0,0 +1,114 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011-2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import itertools
import json
from oslo.config import cfg
from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging
from heat.openstack.common.rpc import matchmaker as mm
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('ringfile',
deprecated_name='matchmaker_ringfile',
deprecated_group='DEFAULT',
default='/etc/oslo/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
LOG = logging.getLogger(__name__)
class RingExchange(mm.Exchange):
"""
Match Maker where hosts are loaded from a static file containing
a hashmap (JSON formatted).
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ring.ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class MatchMakerRing(mm.MatchMakerBase):
"""
Match Maker where hosts are loaded from a static hashmap.
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(mm.DirectBinding(), mm.DirectExchange())
self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
# Copyright 2012-2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -23,6 +23,8 @@ For more information about rpc API version numbers, see:
from heat.openstack.common import rpc
from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object):
@ -34,16 +36,28 @@ class RpcProxy(object):
rpc API.
"""
def __init__(self, topic, default_version):
# The default namespace, which can be overriden in a subclass.
RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None,
serializer=None):
"""Initialize an RpcProxy.
:param topic: The topic to use for all messages.
:param default_version: The default API version to request in all
outgoing messages. This can be overridden on a per-message
basis.
:param version_cap: Optionally cap the maximum version used for sent
messages.
:param serializer: Optionaly (de-)serialize entities with a
provided helper.
"""
self.topic = topic
self.default_version = default_version
self.version_cap = version_cap
if serializer is None:
serializer = rpc_serializer.NoOpSerializer()
self.serializer = serializer
super(RpcProxy, self).__init__()
def _set_version(self, msg, vers):
@ -52,15 +66,39 @@ class RpcProxy(object):
:param msg: The message having a version added to it.
:param vers: The version number to add to the message.
"""
msg['version'] = vers if vers else self.default_version
v = vers if vers else self.default_version
if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)):
raise rpc_common.RpcVersionCapError(version=self.version_cap)
msg['version'] = v
def _get_topic(self, topic):
"""Return the topic to use for a message."""
return topic if topic else self.topic
@staticmethod
def make_msg(method, **kwargs):
return {'method': method, 'args': kwargs}
def make_namespaced_msg(method, namespace, **kwargs):
return {'method': method, 'namespace': namespace, 'args': kwargs}
def make_msg(self, method, **kwargs):
return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
**kwargs)
def _serialize_msg_args(self, context, kwargs):
"""Helper method called to serialize message arguments.
This calls our serializer on each argument, returning a new
set of args that have been serialized.
:param context: The request context
:param kwargs: The arguments to serialize
:returns: A new set of serialized arguments
"""
new_kwargs = dict()
for argname, arg in kwargs.iteritems():
new_kwargs[argname] = self.serializer.serialize_entity(context,
arg)
return new_kwargs
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
@ -68,16 +106,23 @@ class RpcProxy(object):
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
:param version: (Optional) Override the requested API version in this
message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
:param version: (Optional) Override the requested API version in this
message.
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
return rpc.call(context, self._get_topic(topic), msg, timeout)
msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
result = rpc.call(context, real_topic, msg, timeout)
return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
def multicall(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.multicall() a remote method.
@ -85,17 +130,24 @@ class RpcProxy(object):
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
:param version: (Optional) Override the requested API version in this
message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
:param version: (Optional) Override the requested API version in this
message.
:returns: An iterator that lets you process each of the returned values
from the remote method as they arrive.
"""
self._set_version(msg, version)
return rpc.multicall(context, self._get_topic(topic), msg, timeout)
msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
result = rpc.multicall(context, real_topic, msg, timeout)
return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
def cast(self, context, msg, topic=None, version=None):
"""rpc.cast() a remote method.
@ -110,6 +162,7 @@ class RpcProxy(object):
remote method.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
@ -125,6 +178,7 @@ class RpcProxy(object):
from the remote method.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
@ -143,6 +197,7 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
@ -161,5 +216,6 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)

View File

@ -0,0 +1,52 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provides the definition of an RPC serialization handler"""
import abc
class Serializer(object):
"""Generic (de-)serialization definition base class"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def serialize_entity(self, context, entity):
"""Serialize something to primitive form.
:param context: Security context
:param entity: Entity to be serialized
:returns: Serialized form of entity
"""
pass
@abc.abstractmethod
def deserialize_entity(self, context, entity):
"""Deserialize something from primitive form.
:param context: Security context
:param entity: Primitive to be deserialized
:returns: Deserialized form of entity
"""
pass
class NoOpSerializer(Serializer):
"""A serializer that does nothing"""
def serialize_entity(self, context, entity):
return entity
def deserialize_entity(self, context, entity):
return entity

View File

@ -0,0 +1,41 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import contextlib
import sys
from oslo.config import cfg
from heat.openstack.common import log as logging
from heat.openstack.common import rpc
from heat.openstack.common.rpc import impl_zmq
CONF = cfg.CONF
CONF.register_opts(rpc.rpc_opts)
CONF.register_opts(impl_zmq.zmq_opts)
def main():
CONF(sys.argv[1:], project='oslo')
logging.setup("oslo")
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
reactor.consume_in_thread()
reactor.wait()

View File

@ -20,7 +20,6 @@
"""Generic Node base class for all workers that run on hosts."""
import errno
import logging as std_logging
import os
import random
import signal
@ -28,6 +27,7 @@ import sys
import time
import eventlet
import logging as std_logging
from oslo.config import cfg
from heat.openstack.common import eventlet_backdoor
@ -52,7 +52,7 @@ class Launcher(object):
"""
self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_service(service):
@ -72,6 +72,7 @@ class Launcher(object):
:returns: None
"""
service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service)
def stop(self):

View File

@ -61,6 +61,13 @@ class ThreadGroup(object):
self.threads = []
self.timers = []
def add_dynamic_timer(self, callback, initial_delay=None,
periodic_interval_max=None, *args, **kwargs):
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
timer.start(initial_delay=initial_delay,
periodic_interval_max=periodic_interval_max)
self.timers.append(timer)
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -25,18 +25,22 @@ import datetime
import iso8601
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
def isotime(at=None):
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format"""
if not at:
at = utcnow()
str = at.strftime(TIME_FORMAT)
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
str += ('Z' if tz == 'UTC' else tz)
return str
st += ('Z' if tz == 'UTC' else tz)
return st
def parse_isotime(timestr):
@ -179,4 +183,4 @@ def is_soon(dt, window):
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) < soon
return normalize_time(dt) <= soon

View File

@ -223,19 +223,19 @@ class EngineClient(heat.openstack.common.rpc.proxy.RpcProxy):
return self.call(ctxt, self.make_msg('show_watch',
watch_name=watch_name))
def show_watch_metric(self, ctxt, namespace=None, metric_name=None):
def show_watch_metric(self, ctxt, metric_namespace=None, metric_name=None):
"""
The show_watch_metric method returns the datapoints associated
with a specified metric, or all metrics if no metric_name is passed
:param ctxt: RPC context.
:param namespace: Name of the namespace you want to see,
:param metric_namespace: Name of the namespace you want to see,
or None to see all
:param metric_name: Name of the metric you want to see,
or None to see all
"""
return self.call(ctxt, self.make_msg('show_watch_metric',
namespace=namespace,
metric_namespace=metric_namespace,
metric_name=metric_name))
def set_watch_state(self, ctxt, watch_name, state):

View File

@ -125,7 +125,8 @@ class CfnStackControllerTest(HeatTestCase):
u'stack_status': u'CREATE_COMPLETE'}]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'list_stacks',
{'namespace': None,
'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -154,7 +155,8 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'list_stacks',
{'namespace': None,
'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("AttributeError"))
@ -175,7 +177,8 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'list_stacks',
{'namespace': None,
'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("Exception"))
@ -225,11 +228,13 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
{'method': 'show_stack',
{'namespace': None,
'method': 'show_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
@ -312,7 +317,8 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'show_stack',
{'namespace': None,
'method': 'show_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
@ -367,7 +373,8 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'show_stack',
{'namespace': None,
'method': 'show_stack',
'args': {'stack_identity': identity},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("InvalidTenant"))
@ -389,11 +396,13 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
{'method': 'show_stack',
{'namespace': None,
'method': 'show_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("AttributeError"))
@ -414,7 +423,8 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -459,7 +469,8 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'create_stack',
{'namespace': None,
'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': engine_parms,
@ -521,7 +532,8 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'create_stack',
{'namespace': None,
'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': engine_parms,
@ -556,7 +568,8 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'create_stack',
{'namespace': None,
'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': engine_parms,
@ -590,7 +603,8 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'create_stack',
{'namespace': None,
'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': engine_parms,
@ -626,12 +640,14 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
{'method': 'update_stack',
{'namespace': None,
'method': 'update_stack',
'args': {'stack_identity': identity,
'template': template,
'params': engine_parms,
@ -668,7 +684,8 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -697,11 +714,13 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
{'method': 'get_template',
{'namespace': None,
'method': 'get_template',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
@ -727,11 +746,13 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
{'method': 'get_template',
{'namespace': None,
'method': 'get_template',
'args': {'stack_identity': identity},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("AttributeError"))
@ -753,7 +774,8 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -778,11 +800,13 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
{'method': 'get_template',
{'namespace': None,
'method': 'get_template',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
@ -824,12 +848,14 @@ class CfnStackControllerTest(HeatTestCase):
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
# Engine returns None when delete successful
rpc.call(dummy_req.context, self.topic,
{'method': 'delete_stack',
{'namespace': None,
'method': 'delete_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(None)
@ -851,14 +877,16 @@ class CfnStackControllerTest(HeatTestCase):
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
rpc.call(dummy_req.context, self.topic,
{'method': 'delete_stack',
{'namespace': None,
'method': 'delete_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("AttributeError"))
@ -880,7 +908,8 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -920,11 +949,13 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
@ -960,11 +991,13 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': identity},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("Exception"))
@ -985,7 +1018,8 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -1030,7 +1064,8 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
args = {
@ -1038,7 +1073,8 @@ class CfnStackControllerTest(HeatTestCase):
'resource_name': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
{'method': 'describe_stack_resource',
{'namespace': None,
'method': 'describe_stack_resource',
'args': args,
'version': self.api_version}, None).AndReturn(engine_resp)
@ -1076,7 +1112,8 @@ class CfnStackControllerTest(HeatTestCase):
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -1100,7 +1137,8 @@ class CfnStackControllerTest(HeatTestCase):
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
args = {
@ -1108,7 +1146,8 @@ class CfnStackControllerTest(HeatTestCase):
'resource_name': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
{'method': 'describe_stack_resource',
{'namespace': None,
'method': 'describe_stack_resource',
'args': args,
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("ResourceNotFound"))
@ -1153,7 +1192,8 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
args = {
@ -1161,7 +1201,8 @@ class CfnStackControllerTest(HeatTestCase):
'resource_name': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
{'method': 'describe_stack_resources',
{'namespace': None,
'method': 'describe_stack_resources',
'args': args,
'version': self.api_version}, None).AndReturn(engine_resp)
@ -1197,7 +1238,8 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -1242,7 +1284,8 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'find_physical_resource',
{'namespace': None,
'method': 'find_physical_resource',
'args': {'physical_resource_id':
'a3455d8c-9f88-404d-a85b-5315293e67de'},
'version': self.api_version}, None).AndReturn(identity)
@ -1251,7 +1294,8 @@ class CfnStackControllerTest(HeatTestCase):
'resource_name': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
{'method': 'describe_stack_resources',
{'namespace': None,
'method': 'describe_stack_resources',
'args': args,
'version': self.api_version}, None).AndReturn(engine_resp)
@ -1288,7 +1332,8 @@ class CfnStackControllerTest(HeatTestCase):
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'find_physical_resource',
{'namespace': None,
'method': 'find_physical_resource',
'args': {'physical_resource_id':
'aaaaaaaa-9f88-404d-cccc-ffffffffffff'},
'version': self.api_version},
@ -1346,11 +1391,13 @@ class CfnStackControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
{'method': 'list_stack_resources',
{'namespace': None,
'method': 'list_stack_resources',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
@ -1381,7 +1428,8 @@ class CfnStackControllerTest(HeatTestCase):
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))

View File

@ -140,7 +140,8 @@ class WatchControllerTest(HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
{'args': {'watch_name': watch_name},
{'namespace': None,
'args': {'watch_name': watch_name},
'method': 'show_watch',
'version': self.api_version},
None).AndReturn(engine_resp)
@ -238,7 +239,8 @@ class WatchControllerTest(HeatTestCase):
# and pass None/None for namespace/watch_name which returns
# all metric data which we post-process in the API
rpc.call(dummy_req.context, self.topic,
{'args': {'namespace': None, 'metric_name': None},
{'namespace': None,
'args': {'metric_namespace': None, 'metric_name': None},
'method': 'show_watch_metric',
'version': self.api_version},
None).AndReturn(engine_resp)
@ -317,9 +319,11 @@ class WatchControllerTest(HeatTestCase):
# and pass None/None for namespace/watch_name which returns
# all metric data which we post-process in the API
rpc.call(dummy_req.context, self.topic, {'args':
{'namespace': None,
'metric_name': None},
'method': 'show_watch_metric', 'version': self.api_version},
{'metric_namespace': None,
'metric_name': None},
'namespace': None,
'method': 'show_watch_metric',
'version': self.api_version},
None).AndReturn(engine_resp)
self.m.ReplayAll()
@ -375,10 +379,11 @@ class WatchControllerTest(HeatTestCase):
# Current engine implementation means we filter in the API
# and pass None/None for namespace/watch_name which returns
# all metric data which we post-process in the API
rpc.call(dummy_req.context, self.topic, {'args':
{'namespace': None,
'metric_name': None},
'method': 'show_watch_metric', 'version': self.api_version},
rpc.call(dummy_req.context, self.topic,
{'args': {'metric_namespace': None, 'metric_name': None},
'namespace': None,
'method': 'show_watch_metric',
'version': self.api_version},
None).AndReturn(engine_resp)
self.m.ReplayAll()
@ -445,6 +450,7 @@ class WatchControllerTest(HeatTestCase):
'Unit': u'Count',
'Dimensions': []}},
'watch_name': u'HttpFailureAlarm'},
'namespace': None,
'method': 'create_watch_data',
'version': self.api_version},
None).AndReturn(engine_resp)
@ -480,6 +486,7 @@ class WatchControllerTest(HeatTestCase):
{'args':
{'state': state_map[state],
'watch_name': u'HttpFailureAlarm'},
'namespace': None,
'method': 'set_watch_state',
'version': self.api_version},
None).AndReturn(engine_resp)

View File

@ -245,7 +245,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_stacks',
{'namespace': None,
'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -276,7 +277,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_stacks',
{'namespace': None,
'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("AttributeError"))
@ -292,7 +294,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_stacks',
{'namespace': None,
'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("Exception"))
@ -317,7 +320,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'create_stack',
{'namespace': None,
'method': 'create_stack',
'args': {'stack_name': identity.stack_name,
'template': template,
'params': parameters,
@ -350,7 +354,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'create_stack',
{'namespace': None,
'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': parameters,
@ -378,7 +383,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'create_stack',
{'namespace': None,
'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': parameters,
@ -406,7 +412,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'create_stack',
{'namespace': None,
'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': parameters,
@ -429,7 +436,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': identity.stack_name},
'version': self.api_version},
None).AndReturn(identity)
@ -468,7 +476,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -485,7 +494,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': identity.stack_name},
'version': self.api_version},
None).AndReturn(identity)
@ -510,7 +520,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'identify_stack',
{'namespace': None,
'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -555,7 +566,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'show_stack',
{'namespace': None,
'method': 'show_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -595,7 +607,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'show_stack',
{'namespace': None,
'method': 'show_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -615,7 +628,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'show_stack',
{'namespace': None,
'method': 'show_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("InvalidTenant"))
@ -635,7 +649,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'get_template',
{'namespace': None,
'method': 'get_template',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndReturn(template)
@ -655,7 +670,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'get_template',
{'namespace': None,
'method': 'get_template',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -685,7 +701,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'update_stack',
{'namespace': None,
'method': 'update_stack',
'args': {'stack_identity': dict(identity),
'template': template,
'params': parameters,
@ -716,7 +733,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'update_stack',
{'namespace': None,
'method': 'update_stack',
'args': {'stack_identity': dict(identity),
'template': template,
'params': parameters,
@ -747,7 +765,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
# Engine returns None when delete successful
rpc.call(req.context, self.topic,
{'method': 'delete_stack',
{'namespace': None,
'method': 'delete_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndReturn(None)
@ -774,7 +793,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
# Engine returns None when delete successful
rpc.call(req.context, self.topic,
{'method': 'delete_stack',
{'namespace': None,
'method': 'delete_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -807,7 +827,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'validate_template',
{'namespace': None,
'method': 'validate_template',
'args': {'template': template},
'version': self.api_version},
None).AndReturn(engine_response)
@ -828,7 +849,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'validate_template',
{'namespace': None,
'method': 'validate_template',
'args': {'template': template},
'version': self.api_version},
None).AndReturn({'Error': 'fubar'})
@ -848,7 +870,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_resource_types',
{'namespace': None,
'method': 'list_resource_types',
'args': {},
'version': self.api_version},
None).AndReturn(engine_response)
@ -867,7 +890,8 @@ class StackControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_resource_types',
{'namespace': None,
'method': 'list_resource_types',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("ValueError"))
@ -920,7 +944,8 @@ class ResourceControllerTest(ControllerTest, HeatTestCase):
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_stack_resources',
{'namespace': None,
'method': 'list_stack_resources',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -954,7 +979,8 @@ class ResourceControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_stack_resources',
{'namespace': None,
'method': 'list_stack_resources',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -992,7 +1018,8 @@ class ResourceControllerTest(ControllerTest, HeatTestCase):
}
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'describe_stack_resource',
{'namespace': None,
'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
@ -1035,7 +1062,8 @@ class ResourceControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'describe_stack_resource',
{'namespace': None,
'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
@ -1061,7 +1089,8 @@ class ResourceControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'describe_stack_resource',
{'namespace': None,
'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
@ -1087,7 +1116,8 @@ class ResourceControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'describe_stack_resource',
{'namespace': None,
'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
@ -1127,7 +1157,8 @@ class ResourceControllerTest(ControllerTest, HeatTestCase):
}
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'describe_stack_resource',
{'namespace': None,
'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
@ -1155,7 +1186,8 @@ class ResourceControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'describe_stack_resource',
{'namespace': None,
'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
@ -1181,7 +1213,8 @@ class ResourceControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'describe_stack_resource',
{'namespace': None,
'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
@ -1254,7 +1287,8 @@ class EventControllerTest(ControllerTest, HeatTestCase):
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -1314,7 +1348,8 @@ class EventControllerTest(ControllerTest, HeatTestCase):
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -1353,7 +1388,8 @@ class EventControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
@ -1395,7 +1431,8 @@ class EventControllerTest(ControllerTest, HeatTestCase):
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -1453,7 +1490,8 @@ class EventControllerTest(ControllerTest, HeatTestCase):
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -1516,7 +1554,8 @@ class EventControllerTest(ControllerTest, HeatTestCase):
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -1559,7 +1598,8 @@ class EventControllerTest(ControllerTest, HeatTestCase):
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
@ -1584,7 +1624,8 @@ class EventControllerTest(ControllerTest, HeatTestCase):
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
{'method': 'list_events',
{'namespace': None,
'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))

View File

@ -834,13 +834,14 @@ class stackServiceTest(stackServiceTestBase):
watch = db_api.watch_data_create(self.ctx, values)
# Check there is one result returned
result = self.man.show_watch_metric(self.ctx, namespace=None,
result = self.man.show_watch_metric(self.ctx,
metric_namespace=None,
metric_name=None)
self.assertEqual(1, len(result))
# Create another metric datapoint and check we get two
watch = db_api.watch_data_create(self.ctx, values)
result = self.man.show_watch_metric(self.ctx, namespace=None,
result = self.man.show_watch_metric(self.ctx, metric_namespace=None,
metric_name=None)
self.assertEqual(2, len(result))

View File

@ -308,8 +308,8 @@ class StackTest(HeatTestCase):
setup_dummy_db()
self.ctx = context.get_admin_context()
self.m.StubOutWithMock(self.ctx, 'username')
self.ctx.username = self.username
self.m.StubOutWithMock(self.ctx, 'user')
self.ctx.user = self.username
self.ctx.tenant_id = 'test_tenant'
generic_rsrc.GenericResource.properties_schema = {}

View File

@ -156,7 +156,7 @@ class EngineRpcAPITestCase(unittest.TestCase):
def test_show_watch_metric(self):
self._test_engine_api('show_watch_metric', 'call',
namespace=None, metric_name=None)
metric_namespace=None, metric_name=None)
def test_set_watch_state(self):
self._test_engine_api('set_watch_state', 'call',

View File

@ -21,6 +21,7 @@ iso8601>=0.1.4
kombu>=1.0.4
argparse
lxml>=2.3,<=2.3.5
six
sqlalchemy-migrate>=0.7.2
python-novaclient>=2.11.0,<3
PasteDeploy==1.5.0