Update openstack-common

Change-Id: I002574a60b4f59543bc6aa73256c2f0b3b79d378
Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
This commit is contained in:
Angus Salkeld 2012-09-13 13:50:24 +10:00
parent 04701a275f
commit 61ba35be90
19 changed files with 301 additions and 150 deletions

View File

@ -367,6 +367,11 @@ class ConfigFileValueError(Error):
pass pass
def _fixpath(p):
"""Apply tilde expansion and absolutization to a path."""
return os.path.abspath(os.path.expanduser(p))
def _get_config_dirs(project=None): def _get_config_dirs(project=None):
"""Return a list of directors where config files may be located. """Return a list of directors where config files may be located.
@ -384,11 +389,9 @@ def _get_config_dirs(project=None):
~/ ~/
/etc/ /etc/
""" """
fix_path = lambda p: os.path.abspath(os.path.expanduser(p))
cfg_dirs = [ cfg_dirs = [
fix_path(os.path.join('~', '.' + project)) if project else None, _fixpath(os.path.join('~', '.' + project)) if project else None,
fix_path('~'), _fixpath('~'),
os.path.join('/etc', project) if project else None, os.path.join('/etc', project) if project else None,
'/etc' '/etc'
] ]
@ -464,7 +467,7 @@ def _is_opt_registered(opts, opt):
:raises: DuplicateOptError if a naming conflict is detected :raises: DuplicateOptError if a naming conflict is detected
""" """
if opt.dest in opts: if opt.dest in opts:
if opts[opt.dest]['opt'] is not opt: if opts[opt.dest]['opt'] != opt:
raise DuplicateOptError(opt.name) raise DuplicateOptError(opt.name)
return True return True
else: else:
@ -527,6 +530,9 @@ class Opt(object):
else: else:
self.deprecated_name = None self.deprecated_name = None
def __ne__(self, another):
return vars(self) != vars(another)
def _get_from_config_parser(self, cparser, section): def _get_from_config_parser(self, cparser, section):
"""Retrieves the option value from a MultiConfigParser object. """Retrieves the option value from a MultiConfigParser object.
@ -806,7 +812,7 @@ class OptGroup(object):
if _is_opt_registered(self._opts, opt): if _is_opt_registered(self._opts, opt):
return False return False
self._opts[opt.dest] = {'opt': opt, 'override': None, 'default': None} self._opts[opt.dest] = {'opt': opt}
return True return True
@ -1084,7 +1090,7 @@ class ConfigOpts(collections.Mapping):
if _is_opt_registered(self._opts, opt): if _is_opt_registered(self._opts, opt):
return False return False
self._opts[opt.dest] = {'opt': opt, 'override': None, 'default': None} self._opts[opt.dest] = {'opt': opt}
return True return True
@ -1153,6 +1159,25 @@ class ConfigOpts(collections.Mapping):
for opt in opts: for opt in opts:
self.unregister_opt(opt, group, clear_cache=False) self.unregister_opt(opt, group, clear_cache=False)
def import_opt(self, name, module_str, group=None):
"""Import an option definition from a module.
Import a module and check that a given option is registered.
This is intended for use with global configuration objects
like cfg.CONF where modules commonly register options with
CONF at module load time. If one module requires an option
defined by another module it can use this method to explicitly
declare the dependency.
:param name: the name/dest of the opt
:param module_str: the name of a module to import
:param group: an option OptGroup object or group name
:raises: NoSuchOptError, NoSuchGroupError
"""
__import__(module_str)
self._get_opt_info(name, group)
@__clear_cache @__clear_cache
def set_override(self, name, override, group=None): def set_override(self, name, override, group=None):
"""Override an opt value. """Override an opt value.
@ -1183,6 +1208,33 @@ class ConfigOpts(collections.Mapping):
opt_info = self._get_opt_info(name, group) opt_info = self._get_opt_info(name, group)
opt_info['default'] = default opt_info['default'] = default
@__clear_cache
def clear_override(self, name, group=None):
"""Clear an override an opt value.
Clear a previously set override of the command line, config file
and default values of a given option.
:param name: the name/dest of the opt
:param group: an option OptGroup object or group name
:raises: NoSuchOptError, NoSuchGroupError
"""
opt_info = self._get_opt_info(name, group)
opt_info.pop('override', None)
@__clear_cache
def clear_default(self, name, group=None):
"""Clear an override an opt's default value.
Clear a previously set override of the default value of given option.
:param name: the name/dest of the opt
:param group: an option OptGroup object or group name
:raises: NoSuchOptError, NoSuchGroupError
"""
opt_info = self._get_opt_info(name, group)
opt_info.pop('default', None)
def _all_opt_infos(self): def _all_opt_infos(self):
"""A generator function for iteration opt infos.""" """A generator function for iteration opt infos."""
for info in self._opts.values(): for info in self._opts.values():
@ -1199,8 +1251,8 @@ class ConfigOpts(collections.Mapping):
def _unset_defaults_and_overrides(self): def _unset_defaults_and_overrides(self):
"""Unset any default or override on all options.""" """Unset any default or override on all options."""
for info, group in self._all_opt_infos(): for info, group in self._all_opt_infos():
info['default'] = None info.pop('default', None)
info['override'] = None info.pop('override', None)
def disable_interspersed_args(self): def disable_interspersed_args(self):
"""Set parsing to stop on the first non-option. """Set parsing to stop on the first non-option.
@ -1246,10 +1298,10 @@ class ConfigOpts(collections.Mapping):
""" """
dirs = [] dirs = []
if self.config_dir: if self.config_dir:
dirs.append(self.config_dir) dirs.append(_fixpath(self.config_dir))
for cf in reversed(self.config_file): for cf in reversed(self.config_file):
dirs.append(os.path.dirname(cf)) dirs.append(os.path.dirname(_fixpath(cf)))
dirs.extend(_get_config_dirs(self.project)) dirs.extend(_get_config_dirs(self.project))
@ -1323,10 +1375,10 @@ class ConfigOpts(collections.Mapping):
return self.GroupAttr(self, self._get_group(name)) return self.GroupAttr(self, self._get_group(name))
info = self._get_opt_info(name, group) info = self._get_opt_info(name, group)
default, opt, override = [info[k] for k in sorted(info.keys())] opt = info['opt']
if override is not None: if 'override' in info:
return override return info['override']
values = [] values = []
if self._cparser is not None: if self._cparser is not None:
@ -1354,8 +1406,8 @@ class ConfigOpts(collections.Mapping):
if values: if values:
return values return values
if default is not None: if 'default' in info:
return default return info['default']
return opt.default return opt.default
@ -1430,6 +1482,8 @@ class ConfigOpts(collections.Mapping):
config_dir_glob = os.path.join(self.config_dir, '*.conf') config_dir_glob = os.path.join(self.config_dir, '*.conf')
config_files += sorted(glob.glob(config_dir_glob)) config_files += sorted(glob.glob(config_dir_glob))
config_files = [_fixpath(p) for p in config_files]
self._cparser = MultiConfigParser() self._cparser = MultiConfigParser()
try: try:
@ -1447,10 +1501,10 @@ class ConfigOpts(collections.Mapping):
:raises: RequiredOptError :raises: RequiredOptError
""" """
for info, group in self._all_opt_infos(): for info, group in self._all_opt_infos():
default, opt, override = [info[k] for k in sorted(info.keys())] opt = info['opt']
if opt.required: if opt.required:
if (default is not None or override is not None): if ('default' in info or 'override' in info):
continue continue
if self._get(opt.name, group) is None: if self._get(opt.name, group) is None:

View File

@ -19,7 +19,6 @@
Exceptions common to OpenStack projects Exceptions common to OpenStack projects
""" """
import itertools
import logging import logging

View File

@ -30,14 +30,14 @@ def save_and_reraise_exception():
"""Save current exception, run some code and then re-raise. """Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None In some cases the exception context can be cleared, resulting in None
being attempted to be reraised after an exception handler is run. This being attempted to be re-raised after an exception handler is run. This
can happen when eventlet switches greenthreads or when running an can happen when eventlet switches greenthreads or when running an
exception handler, code raises and catches an exception. In both exception handler, code raises and catches an exception. In both
cases the exception context will be cleared. cases the exception context will be cleared.
To work around this, we save the exception state, run handler code, and To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is reraised. saved exception is logged and the new exception is re-raised.
""" """
type_, value, tb = sys.exc_info() type_, value, tb = sys.exc_info()
try: try:

View File

@ -29,7 +29,7 @@ def import_class(import_str):
try: try:
__import__(mod_str) __import__(mod_str)
return getattr(sys.modules[mod_str], class_str) return getattr(sys.modules[mod_str], class_str)
except (ImportError, ValueError, AttributeError), exc: except (ValueError, AttributeError), exc:
raise ImportError('Class %s cannot be found (%s)' % raise ImportError('Class %s cannot be found (%s)' %
(class_str, (class_str,
traceback.format_exception(*sys.exc_info()))) traceback.format_exception(*sys.exc_info())))

View File

@ -107,9 +107,11 @@ def to_primitive(value, convert_instances=False, level=0):
elif hasattr(value, 'iteritems'): elif hasattr(value, 'iteritems'):
return to_primitive(dict(value.iteritems()), return to_primitive(dict(value.iteritems()),
convert_instances=convert_instances, convert_instances=convert_instances,
level=level) level=level + 1)
elif hasattr(value, '__iter__'): elif hasattr(value, '__iter__'):
return to_primitive(list(value), level) return to_primitive(list(value),
convert_instances=convert_instances,
level=level)
elif convert_instances and hasattr(value, '__dict__'): elif convert_instances and hasattr(value, '__dict__'):
# Likely an instance of something. Watch for cycles. # Likely an instance of something. Watch for cycles.
# Ignore class member vars. # Ignore class member vars.

View File

@ -54,25 +54,24 @@ log_opts = [
'%(message)s', '%(message)s',
help='format string to use for log messages with context'), help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string', cfg.StrOpt('logging_default_format_string',
default='%(asctime)s %(levelname)s %(name)s [-] %(instance)s' default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
'%(message)s', ' %(instance)s%(message)s',
help='format string to use for log messages without context'), help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix', cfg.StrOpt('logging_debug_format_suffix',
default='from (pid=%(process)d) %(funcName)s ' default='%(funcName)s %(pathname)s:%(lineno)d',
'%(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'), help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix', cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s TRACE %(name)s %(instance)s', default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
help='prefix each line of exception output with this format'), help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels', cfg.ListOpt('default_log_levels',
default=[ default=[
'amqplib=WARN', 'amqplib=WARN',
'sqlalchemy=WARN', 'sqlalchemy=WARN',
'boto=WARN', 'boto=WARN',
'suds=INFO', 'suds=INFO',
'keystone=INFO', 'keystone=INFO',
'eventlet.wsgi.server=WARN' 'eventlet.wsgi.server=WARN'
], ],
help='list of logger=LEVEL pairs'), help='list of logger=LEVEL pairs'),
cfg.BoolOpt('publish_errors', cfg.BoolOpt('publish_errors',
default=False, default=False,
@ -89,7 +88,7 @@ log_opts = [
default='[instance: %(uuid)s] ', default='[instance: %(uuid)s] ',
help='If an instance UUID is passed with the log message, ' help='If an instance UUID is passed with the log message, '
'format it like this'), 'format it like this'),
] ]
generic_log_opts = [ generic_log_opts = [
@ -105,7 +104,7 @@ generic_log_opts = [
cfg.StrOpt('logfile_mode', cfg.StrOpt('logfile_mode',
default='0644', default='0644',
help='Default file mode used when creating log files'), help='Default file mode used when creating log files'),
] ]
CONF = cfg.CONF CONF = cfg.CONF
@ -208,9 +207,9 @@ class JSONFormatter(logging.Formatter):
def formatException(self, ei, strip_newlines=True): def formatException(self, ei, strip_newlines=True):
lines = traceback.format_exception(*ei) lines = traceback.format_exception(*ei)
if strip_newlines: if strip_newlines:
lines = [itertools.ifilter(lambda x: x, lines = [itertools.ifilter(
line.rstrip().splitlines()) lambda x: x,
for line in lines] line.rstrip().splitlines()) for line in lines]
lines = list(itertools.chain(*lines)) lines = list(itertools.chain(*lines))
return lines return lines
@ -247,26 +246,27 @@ class JSONFormatter(logging.Formatter):
class PublishErrorsHandler(logging.Handler): class PublishErrorsHandler(logging.Handler):
def emit(self, record): def emit(self, record):
if 'list_notifier_drivers' in CONF: if ('heat.openstack.common.notifier.log_notifier' in
if ('heat.openstack.common.notifier.log_notifier' in CONF.notification_driver):
CONF.list_notifier_drivers): return
return
notifier.api.notify(None, 'error.publisher', notifier.api.notify(None, 'error.publisher',
'error_notification', 'error_notification',
notifier.api.ERROR, notifier.api.ERROR,
dict(error=record.msg)) dict(error=record.msg))
def handle_exception(type, value, tb): def _create_logging_excepthook(product_name):
extra = {} def logging_excepthook(type, value, tb):
if CONF.verbose: extra = {}
extra['exc_info'] = (type, value, tb) if CONF.verbose:
getLogger().critical(str(value), **extra) extra['exc_info'] = (type, value, tb)
getLogger(product_name).critical(str(value), **extra)
return logging_excepthook
def setup(product_name): def setup(product_name):
"""Setup logging.""" """Setup logging."""
sys.excepthook = handle_exception sys.excepthook = _create_logging_excepthook(product_name)
if CONF.log_config: if CONF.log_config:
try: try:
@ -357,17 +357,6 @@ def _setup_logging_from_conf(product_name):
for handler in log_root.handlers: for handler in log_root.handlers:
logger.addHandler(handler) logger.addHandler(handler)
# NOTE(jkoelker) Clear the handlers for the root logger that was setup
# by basicConfig in nova/__init__.py and install the
# NullHandler.
root = logging.getLogger()
for handler in root.handlers:
root.removeHandler(handler)
handler = NullHandler()
handler.setFormatter(logging.Formatter())
root.addHandler(handler)
_loggers = {} _loggers = {}
@ -405,8 +394,12 @@ class LegacyFormatter(logging.Formatter):
def format(self, record): def format(self, record):
"""Uses contextstring if request_id is set, otherwise default.""" """Uses contextstring if request_id is set, otherwise default."""
if 'instance' not in record.__dict__: # NOTE(sdague): default the fancier formating params
record.__dict__['instance'] = '' # to an empty string so we don't throw an exception if
# they get used
for key in ('instance', 'color'):
if key not in record.__dict__:
record.__dict__[key] = ''
if record.__dict__.get('request_id', None): if record.__dict__.get('request_id', None):
self._fmt = CONF.logging_context_format_string self._fmt = CONF.logging_context_format_string

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import inspect
import uuid import uuid
from heat.openstack.common import cfg from heat.openstack.common import cfg
@ -28,16 +27,17 @@ from heat.openstack.common import timeutils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
notifier_opts = [ notifier_opts = [
cfg.StrOpt('notification_driver', cfg.MultiStrOpt('notification_driver',
default='heat.openstack.common.notifier.no_op_notifier', default=[],
help='Default driver for sending notifications'), deprecated_name='list_notifier_drivers',
help='Driver or drivers to handle sending notifications'),
cfg.StrOpt('default_notification_level', cfg.StrOpt('default_notification_level',
default='INFO', default='INFO',
help='Default notification level for outgoing notifications'), help='Default notification level for outgoing notifications'),
cfg.StrOpt('default_publisher_id', cfg.StrOpt('default_publisher_id',
default='$host', default='$host',
help='Default publisher_id for outgoing notifications'), help='Default publisher_id for outgoing notifications'),
] ]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(notifier_opts) CONF.register_opts(notifier_opts)
@ -122,21 +122,60 @@ def notify(context, publisher_id, event_type, priority, payload):
""" """
if priority not in log_levels: if priority not in log_levels:
raise BadPriorityException( raise BadPriorityException(
_('%s not in valid priorities') % priority) _('%s not in valid priorities') % priority)
# Ensure everything is JSON serializable. # Ensure everything is JSON serializable.
payload = jsonutils.to_primitive(payload, convert_instances=True) payload = jsonutils.to_primitive(payload, convert_instances=True)
driver = importutils.import_module(CONF.notification_driver)
msg = dict(message_id=str(uuid.uuid4()), msg = dict(message_id=str(uuid.uuid4()),
publisher_id=publisher_id, publisher_id=publisher_id,
event_type=event_type, event_type=event_type,
priority=priority, priority=priority,
payload=payload, payload=payload,
timestamp=str(timeutils.utcnow())) timestamp=str(timeutils.utcnow()))
try:
driver.notify(context, msg) for driver in _get_drivers():
except Exception, e: try:
LOG.exception(_("Problem '%(e)s' attempting to " driver.notify(context, msg)
"send to notification system. Payload=%(payload)s") % except Exception, e:
locals()) LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. Payload=%(payload)s") %
locals())
_drivers = None
def _get_drivers():
"""Instantiate, cache, and return drivers based on the CONF."""
global _drivers
if _drivers is None:
_drivers = {}
for notification_driver in CONF.notification_driver:
add_driver(notification_driver)
return _drivers.values()
def add_driver(notification_driver):
"""Add a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
if isinstance(notification_driver, basestring):
# Load and add
try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError as e:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
else:
# Driver is already loaded; just add the object.
_drivers[notification_driver] = notification_driver
def _reset_drivers():
"""Used by unit tests to reset the drivers."""
global _drivers
_drivers = None

View File

@ -30,6 +30,6 @@ def notify(_context, message):
CONF.default_notification_level) CONF.default_notification_level)
priority = priority.lower() priority = priority.lower()
logger = logging.getLogger( logger = logging.getLogger(
'heat.openstack.common.notification.%s' % 'heat.openstack.common.notification.%s' %
message['event_type']) message['event_type'])
getattr(logger, priority)(jsonutils.dumps(message)) getattr(logger, priority)(jsonutils.dumps(message))

View File

@ -22,9 +22,9 @@ from heat.openstack.common import rpc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt('notification_topics', notification_topic_opt = cfg.ListOpt(
default=['notifications', ], 'notification_topics', default=['notifications', ],
help='AMQP topic used for openstack notifications') help='AMQP topic used for openstack notifications')
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opt(notification_topic_opt) CONF.register_opt(notification_topic_opt)

View File

@ -49,15 +49,21 @@ rpc_opts = [
cfg.ListOpt('allowed_rpc_exception_modules', cfg.ListOpt('allowed_rpc_exception_modules',
default=['heat.openstack.common.exception', default=['heat.openstack.common.exception',
'nova.exception', 'nova.exception',
'cinder.exception',
], ],
help='Modules of exceptions that are permitted to be recreated' help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'), 'upon receiving exception data from an rpc call.'),
cfg.StrOpt('control_exchange',
default='nova',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.BoolOpt('fake_rabbit', cfg.BoolOpt('fake_rabbit',
default=False, default=False,
help='If passed, use a fake RabbitMQ provider'), help='If passed, use a fake RabbitMQ provider'),
#
# The following options are not registered here, but are expected to be
# present. The project using this library must register these options with
# the configuration so that project-specific defaults may be defined.
#
#cfg.StrOpt('control_exchange',
# default='nova',
# help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
] ]
cfg.CONF.register_opts(rpc_opts) cfg.CONF.register_opts(rpc_opts)

View File

@ -34,6 +34,7 @@ from eventlet import greenpool
from eventlet import pools from eventlet import pools
from eventlet import semaphore from eventlet import semaphore
from heat.openstack.common import cfg
from heat.openstack.common import excutils from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _
from heat.openstack.common import local from heat.openstack.common import local
@ -416,3 +417,10 @@ def notify(conf, context, topic, msg, connection_pool):
def cleanup(connection_pool): def cleanup(connection_pool):
if connection_pool: if connection_pool:
connection_pool.empty() connection_pool.empty()
def get_control_exchange(conf):
try:
return conf.control_exchange
except cfg.NoSuchOptError:
return 'openstack'

View File

@ -19,10 +19,8 @@
import copy import copy
import logging import logging
import sys
import traceback import traceback
from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils from heat.openstack.common import importutils
from heat.openstack.common import jsonutils from heat.openstack.common import jsonutils
@ -108,7 +106,7 @@ class Connection(object):
""" """
raise NotImplementedError() raise NotImplementedError()
def create_consumer(self, conf, topic, proxy, fanout=False): def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer on this connection. """Create a consumer on this connection.
A consumer is associated with a message queue on the backend message A consumer is associated with a message queue on the backend message
@ -117,7 +115,6 @@ class Connection(object):
off of the queue will determine which method gets called on the proxy off of the queue will determine which method gets called on the proxy
object. object.
:param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from. :param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same Multiple instances of a service may consume from the same
topic. For example, all instances of nova-compute consume topic. For example, all instances of nova-compute consume
@ -133,7 +130,7 @@ class Connection(object):
""" """
raise NotImplementedError() raise NotImplementedError()
def create_worker(self, conf, topic, proxy, pool_name): def create_worker(self, topic, proxy, pool_name):
"""Create a worker on this connection. """Create a worker on this connection.
A worker is like a regular consumer of messages directed to a A worker is like a regular consumer of messages directed to a
@ -143,7 +140,6 @@ class Connection(object):
be asked to process it. Load is distributed across the members be asked to process it. Load is distributed across the members
of the pool in round-robin fashion. of the pool in round-robin fashion.
:param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from. :param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same Multiple instances of a service may consume from the same
topic. topic.

View File

@ -210,10 +210,10 @@ class TopicConsumer(ConsumerBase):
'auto_delete': False, 'auto_delete': False,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=conf.control_exchange, exchange = kombu.entity.Exchange(
type='topic', name=rpc_amqp.get_control_exchange(conf),
durable=options['durable'], type='topic', durable=options['durable'],
auto_delete=options['auto_delete']) auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(channel, super(TopicConsumer, self).__init__(channel,
callback, callback,
tag, tag,
@ -307,8 +307,9 @@ class TopicPublisher(Publisher):
'auto_delete': False, 'auto_delete': False,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(TopicPublisher, self).__init__(channel, conf.control_exchange, super(TopicPublisher, self).__init__(channel,
topic, type='topic', **options) rpc_amqp.get_control_exchange(conf), topic,
type='topic', **options)
class FanoutPublisher(Publisher): class FanoutPublisher(Publisher):

View File

@ -181,9 +181,8 @@ class TopicConsumer(ConsumerBase):
""" """
super(TopicConsumer, self).__init__(session, callback, super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (conf.control_exchange, "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
topic), {}, name or topic, {})
{}, name or topic, {})
class FanoutConsumer(ConsumerBase): class FanoutConsumer(ConsumerBase):
@ -256,9 +255,8 @@ class TopicPublisher(Publisher):
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'topic' publisher. """init a 'topic' publisher.
""" """
super(TopicPublisher, self).__init__( super(TopicPublisher, self).__init__(session,
session, "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic))
"%s/%s" % (conf.control_exchange, topic))
class FanoutPublisher(Publisher): class FanoutPublisher(Publisher):
@ -276,10 +274,9 @@ class NotifyPublisher(Publisher):
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'topic' publisher. """init a 'topic' publisher.
""" """
super(NotifyPublisher, self).__init__( super(NotifyPublisher, self).__init__(session,
session, "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
"%s/%s" % (conf.control_exchange, topic), {"durable": True})
{"durable": True})
class Connection(object): class Connection(object):
@ -329,7 +326,7 @@ class Connection(object):
if self.conf.qpid_reconnect_interval: if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = ( self.connection.reconnect_interval = (
self.conf.qpid_reconnect_interval) self.conf.qpid_reconnect_interval)
self.connection.hearbeat = self.conf.qpid_heartbeat self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay

View File

@ -52,7 +52,7 @@ zmq_opts = [
default=('heat.openstack.common.rpc.' default=('heat.openstack.common.rpc.'
'matchmaker.MatchMakerLocalhost'), 'matchmaker.MatchMakerLocalhost'),
help='MatchMaker driver', help='MatchMaker driver',
), ),
# The following port is unassigned by IANA as of 2012-05-21 # The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501, cfg.IntOpt('rpc_zmq_port', default=9501,
@ -72,7 +72,7 @@ zmq_opts = [
# These globals are defined in register_opts(conf), # These globals are defined in register_opts(conf),
# a mandatory initialization call # a mandatory initialization call
FLAGS = None CONF = None
ZMQ_CTX = None # ZeroMQ Context, must be global. ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object matchmaker = None # memoized matchmaker object
@ -274,7 +274,7 @@ class InternalContext(object):
ctx.replies) ctx.replies)
LOG.debug(_("Sending reply")) LOG.debug(_("Sending reply"))
cast(FLAGS, ctx, topic, { cast(CONF, ctx, topic, {
'method': '-process_reply', 'method': '-process_reply',
'args': { 'args': {
'msg_id': msg_id, 'msg_id': msg_id,
@ -329,7 +329,6 @@ class ZmqBaseReactor(ConsumerBase):
def __init__(self, conf): def __init__(self, conf):
super(ZmqBaseReactor, self).__init__() super(ZmqBaseReactor, self).__init__()
self.conf = conf
self.mapping = {} self.mapping = {}
self.proxies = {} self.proxies = {}
self.threads = [] self.threads = []
@ -405,7 +404,7 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf) super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {} self.topic_proxy = {}
ipc_dir = conf.rpc_zmq_ipc_dir ipc_dir = CONF.rpc_zmq_ipc_dir
self.topic_proxy['zmq_replies'] = \ self.topic_proxy['zmq_replies'] = \
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
@ -413,7 +412,7 @@ class ZmqProxy(ZmqBaseReactor):
self.sockets.append(self.topic_proxy['zmq_replies']) self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock): def consume(self, sock):
ipc_dir = self.conf.rpc_zmq_ipc_dir ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying) #TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv() data = sock.recv()
@ -487,7 +486,6 @@ class Connection(rpc_common.Connection):
"""Manages connections and threads.""" """Manages connections and threads."""
def __init__(self, conf): def __init__(self, conf):
self.conf = conf
self.reactor = ZmqReactor(conf) self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False): def create_consumer(self, topic, proxy, fanout=False):
@ -508,7 +506,7 @@ class Connection(rpc_common.Connection):
# Receive messages from (local) proxy # Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \ inaddr = "ipc://%s/zmq_topic_%s" % \
(self.conf.rpc_zmq_ipc_dir, topic) (CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"), LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB]) ['PULL', 'SUB'][sock_type == zmq.SUB])
@ -527,7 +525,7 @@ class Connection(rpc_common.Connection):
def _cast(addr, context, msg_id, topic, msg, timeout=None): def _cast(addr, context, msg_id, topic, msg, timeout=None):
timeout_cast = timeout or FLAGS.rpc_cast_timeout timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg] payload = [RpcContext.marshal(context), msg]
with Timeout(timeout_cast, exception=rpc_common.Timeout): with Timeout(timeout_cast, exception=rpc_common.Timeout):
@ -545,13 +543,13 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
def _call(addr, context, msg_id, topic, msg, timeout=None): def _call(addr, context, msg_id, topic, msg, timeout=None):
# timeout_response is how long we wait for a response # timeout_response is how long we wait for a response
timeout = timeout or FLAGS.rpc_response_timeout timeout = timeout or CONF.rpc_response_timeout
# The msg_id is used to track replies. # The msg_id is used to track replies.
msg_id = str(uuid.uuid4().hex) msg_id = str(uuid.uuid4().hex)
# Replies always come into the reply service. # Replies always come into the reply service.
reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
LOG.debug(_("Creating payload")) LOG.debug(_("Creating payload"))
# Curry the original request into a reply method. # Curry the original request into a reply method.
@ -573,7 +571,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
with Timeout(timeout, exception=rpc_common.Timeout): with Timeout(timeout, exception=rpc_common.Timeout):
try: try:
msg_waiter = ZmqSocket( msg_waiter = ZmqSocket(
"ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir, "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
zmq.SUB, subscribe=msg_id, bind=False zmq.SUB, subscribe=msg_id, bind=False
) )
@ -599,7 +597,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
# responses for Exceptions. # responses for Exceptions.
for resp in responses: for resp in responses:
if isinstance(resp, types.DictType) and 'exc' in resp: if isinstance(resp, types.DictType) and 'exc' in resp:
raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc']) raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
return responses[-1] return responses[-1]
@ -610,7 +608,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
dispatches to the matchmaker and sends dispatches to the matchmaker and sends
message to all relevant hosts. message to all relevant hosts.
""" """
conf = FLAGS conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = matchmaker.queues(topic) queues = matchmaker.queues(topic)
@ -641,26 +639,22 @@ def create_connection(conf, new=True):
def multicall(conf, *args, **kwargs): def multicall(conf, *args, **kwargs):
"""Multiple calls.""" """Multiple calls."""
register_opts(conf)
return _multi_send(_call, *args, **kwargs) return _multi_send(_call, *args, **kwargs)
def call(conf, *args, **kwargs): def call(conf, *args, **kwargs):
"""Send a message, expect a response.""" """Send a message, expect a response."""
register_opts(conf)
data = _multi_send(_call, *args, **kwargs) data = _multi_send(_call, *args, **kwargs)
return data[-1] return data[-1]
def cast(conf, *args, **kwargs): def cast(conf, *args, **kwargs):
"""Send a message expecting no reply.""" """Send a message expecting no reply."""
register_opts(conf)
_multi_send(_cast, *args, **kwargs) _multi_send(_cast, *args, **kwargs)
def fanout_cast(conf, context, topic, msg, **kwargs): def fanout_cast(conf, context, topic, msg, **kwargs):
"""Send a message to all listening and expect no reply.""" """Send a message to all listening and expect no reply."""
register_opts(conf)
# NOTE(ewindisch): fanout~ is used because it avoid splitting on . # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
# and acts as a non-subtle hint to the matchmaker and ZmqProxy. # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
@ -672,7 +666,6 @@ def notify(conf, context, topic, msg, **kwargs):
Notifications are sent to topic-priority. Notifications are sent to topic-priority.
This differs from the AMQP drivers which send to topic.priority. This differs from the AMQP drivers which send to topic.priority.
""" """
register_opts(conf)
# NOTE(ewindisch): dot-priority in rpc notifier does not # NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions. # work with our assumptions.
topic.replace('.', '-') topic.replace('.', '-')
@ -684,7 +677,7 @@ def cleanup():
global ZMQ_CTX global ZMQ_CTX
global matchmaker global matchmaker
matchmaker = None matchmaker = None
ZMQ_CTX.destroy() ZMQ_CTX.term()
ZMQ_CTX = None ZMQ_CTX = None
@ -697,11 +690,11 @@ def register_opts(conf):
# We memoize through these globals # We memoize through these globals
global ZMQ_CTX global ZMQ_CTX
global matchmaker global matchmaker
global FLAGS global CONF
if not FLAGS: if not CONF:
conf.register_opts(zmq_opts) conf.register_opts(zmq_opts)
FLAGS = conf CONF = conf
# Don't re-set, if this method is called twice. # Don't re-set, if this method is called twice.
if not ZMQ_CTX: if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)

View File

@ -112,11 +112,12 @@ class RpcProxy(object):
self._set_version(msg, version) self._set_version(msg, version)
rpc.cast(context, self._get_topic(topic), msg) rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, version=None): def fanout_cast(self, context, msg, topic=None, version=None):
"""rpc.fanout_cast() a remote method. """rpc.fanout_cast() a remote method.
:param context: The request context :param context: The request context
:param msg: The message to send, including the method and args. :param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
:param version: (Optional) Override the requested API version in this :param version: (Optional) Override the requested API version in this
message. message.
@ -124,7 +125,7 @@ class RpcProxy(object):
from the remote method. from the remote method.
""" """
self._set_version(msg, version) self._set_version(msg, version)
rpc.fanout_cast(context, self.topic, msg) rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None, def cast_to_server(self, context, server_params, msg, topic=None,
version=None): version=None):
@ -144,13 +145,15 @@ class RpcProxy(object):
self._set_version(msg, version) self._set_version(msg, version)
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, version=None): def fanout_cast_to_server(self, context, server_params, msg, topic=None,
version=None):
"""rpc.fanout_cast_to_server() a remote method. """rpc.fanout_cast_to_server() a remote method.
:param context: The request context :param context: The request context
:param server_params: Server parameters. See rpc.cast_to_server() for :param server_params: Server parameters. See rpc.cast_to_server() for
details. details.
:param msg: The message to send, including the method and args. :param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
:param version: (Optional) Override the requested API version in this :param version: (Optional) Override the requested API version in this
message. message.
@ -158,4 +161,5 @@ class RpcProxy(object):
return values. return values.
""" """
self._set_version(msg, version) self._set_version(msg, version)
rpc.fanout_cast_to_server(context, server_params, self.topic, msg) rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)

View File

@ -52,7 +52,6 @@ def canonicalize_emails(changelog, mapping):
# Get requirements from the first file that exists # Get requirements from the first file that exists
def get_reqs_from_files(requirements_files): def get_reqs_from_files(requirements_files):
reqs_in = []
for requirements_file in requirements_files: for requirements_file in requirements_files:
if os.path.exists(requirements_file): if os.path.exists(requirements_file):
return open(requirements_file, 'r').read().split('\n') return open(requirements_file, 'r').read().split('\n')
@ -144,8 +143,8 @@ def _get_git_next_version_suffix(branch_name):
# where the bit after the last . is the short sha, and the bit between # where the bit after the last . is the short sha, and the bit between
# the last and second to last is the revno count # the last and second to last is the revno count
(revno, sha) = post_version.split(".")[-2:] (revno, sha) = post_version.split(".")[-2:]
first_half = "%(milestonever)s~%(datestamp)s" % locals() first_half = "%s~%s" % (milestonever, datestamp)
second_half = "%(revno_prefix)s%(revno)s.%(sha)s" % locals() second_half = "%s%s.%s" % (revno_prefix, revno, sha)
return ".".join((first_half, second_half)) return ".".join((first_half, second_half))

View File

@ -21,7 +21,6 @@ Time related utilities and helper functions.
import calendar import calendar
import datetime import datetime
import time
import iso8601 import iso8601
@ -94,16 +93,34 @@ def set_time_override(override_time=datetime.datetime.utcnow()):
def advance_time_delta(timedelta): def advance_time_delta(timedelta):
"""Advance overriden time using a datetime.timedelta.""" """Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None) assert(not utcnow.override_time is None)
utcnow.override_time += timedelta utcnow.override_time += timedelta
def advance_time_seconds(seconds): def advance_time_seconds(seconds):
"""Advance overriden time by seconds.""" """Advance overridden time by seconds."""
advance_time_delta(datetime.timedelta(0, seconds)) advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override(): def clear_time_override():
"""Remove the overridden time.""" """Remove the overridden time."""
utcnow.override_time = None utcnow.override_time = None
def marshall_now(now=None):
"""Make an rpc-safe datetime with microseconds.
Note: tzinfo is stripped, but not required for relative times."""
if not now:
now = utcnow()
return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
minute=now.minute, second=now.second,
microsecond=now.microsecond)
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
return datetime.datetime(day=tyme['day'], month=tyme['month'],
year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'],
second=tyme['second'], microsecond=tyme['microsecond'])

View File

@ -20,7 +20,6 @@ System-level utilities and helper functions.
""" """
import logging import logging
import os
import random import random
import shlex import shlex
@ -65,6 +64,50 @@ def bool_from_string(subject):
return False return False
def parse_host_port(address, default_port=None):
"""
Interpret a string as a host:port pair.
An IPv6 address MUST be escaped if accompanied by a port,
because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
means both [2001:db8:85a3::8a2e:370:7334] and
[2001:db8:85a3::8a2e:370]:7334.
>>> parse_host_port('server01:80')
('server01', 80)
>>> parse_host_port('server01')
('server01', None)
>>> parse_host_port('server01', default_port=1234)
('server01', 1234)
>>> parse_host_port('[::1]:80')
('::1', 80)
>>> parse_host_port('[::1]')
('::1', None)
>>> parse_host_port('[::1]', default_port=1234)
('::1', 1234)
>>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
('2001:db8:85a3::8a2e:370:7334', 1234)
"""
if address[0] == '[':
# Escaped ipv6
_host, _port = address[1:].split(']')
host = _host
if ':' in _port:
port = _port.split(':')[1]
else:
port = default_port
else:
if address.count(':') == 1:
host, port = address.split(':')
else:
# 0 means ipv4, >1 means ipv6.
# We prohibit unescaped ipv6 addresses with port.
host = address
port = default_port
return (host, None if port is None else int(port))
def execute(*cmd, **kwargs): def execute(*cmd, **kwargs):
""" """
Helper method to execute command with optional retry. Helper method to execute command with optional retry.