Update openstack.common
This fixes #1031508 Change-Id: I2134b97615f87aa982dbbcc877e2d87c8c497244 Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
parent
779dcde0db
commit
01f4f4717d
@ -42,8 +42,8 @@ Options can be strings, integers, floats, booleans, lists or 'multi strings'::
|
||||
osapi_compute_extension_opt = cfg.MultiStrOpt('osapi_compute_extension',
|
||||
default=DEFAULT_EXTENSIONS)
|
||||
|
||||
Option schemas are registered with with the config manager at runtime, but
|
||||
before the option is referenced::
|
||||
Option schemas are registered with the config manager at runtime, but before
|
||||
the option is referenced::
|
||||
|
||||
class ExtensionManager(object):
|
||||
|
||||
@ -464,7 +464,7 @@ def _is_opt_registered(opts, opt):
|
||||
:raises: DuplicateOptError if a naming conflict is detected
|
||||
"""
|
||||
if opt.dest in opts:
|
||||
if opts[opt.dest]['opt'] is not opt:
|
||||
if opts[opt.dest]['opt'] != opt:
|
||||
raise DuplicateOptError(opt.name)
|
||||
return True
|
||||
else:
|
||||
@ -527,6 +527,9 @@ class Opt(object):
|
||||
else:
|
||||
self.deprecated_name = None
|
||||
|
||||
def __ne__(self, another):
|
||||
return vars(self) != vars(another)
|
||||
|
||||
def _get_from_config_parser(self, cparser, section):
|
||||
"""Retrieves the option value from a MultiConfigParser object.
|
||||
|
||||
|
@ -20,6 +20,7 @@ Import related utilities and helper functions.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
|
||||
def import_class(import_str):
|
||||
@ -30,7 +31,8 @@ def import_class(import_str):
|
||||
return getattr(sys.modules[mod_str], class_str)
|
||||
except (ImportError, ValueError, AttributeError), exc:
|
||||
raise ImportError('Class %s cannot be found (%s)' %
|
||||
(class_str, str(exc)))
|
||||
(class_str,
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
|
||||
|
||||
def import_object(import_str, *args, **kwargs):
|
||||
|
@ -53,7 +53,8 @@ class BaseParser(object):
|
||||
key, value = line[:colon], line[colon + 1:]
|
||||
|
||||
value = value.strip()
|
||||
if value[0] == value[-1] and value[0] == "\"" or value[0] == "'":
|
||||
if ((value and value[0] == value[-1]) and
|
||||
(value[0] == "\"" or value[0] == "'")):
|
||||
value = value[1:-1]
|
||||
return key.strip(), [value]
|
||||
|
||||
|
@ -39,6 +39,8 @@ import itertools
|
||||
import json
|
||||
import xmlrpclib
|
||||
|
||||
from ceilometer.openstack.common import timeutils
|
||||
|
||||
|
||||
def to_primitive(value, convert_instances=False, level=0):
|
||||
"""Convert a complex object into primitives.
|
||||
@ -101,13 +103,15 @@ def to_primitive(value, convert_instances=False, level=0):
|
||||
level=level)
|
||||
return o
|
||||
elif isinstance(value, datetime.datetime):
|
||||
return str(value)
|
||||
return timeutils.strtime(value)
|
||||
elif hasattr(value, 'iteritems'):
|
||||
return to_primitive(dict(value.iteritems()),
|
||||
convert_instances=convert_instances,
|
||||
level=level)
|
||||
level=level + 1)
|
||||
elif hasattr(value, '__iter__'):
|
||||
return to_primitive(list(value), level)
|
||||
return to_primitive(list(value),
|
||||
convert_instances=convert_instances,
|
||||
level=level)
|
||||
elif convert_instances and hasattr(value, '__dict__'):
|
||||
# Likely an instance of something. Watch for cycles.
|
||||
# Ignore class member vars.
|
||||
@ -130,11 +134,15 @@ def loads(s):
|
||||
return json.loads(s)
|
||||
|
||||
|
||||
def load(s):
|
||||
return json.load(s)
|
||||
|
||||
|
||||
try:
|
||||
import anyjson
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
anyjson._modules.append((__name__, 'dumps', TypeError,
|
||||
'loads', ValueError))
|
||||
'loads', ValueError, 'load'))
|
||||
anyjson.force_implementation(__name__)
|
||||
|
@ -41,6 +41,7 @@ import sys
|
||||
import traceback
|
||||
|
||||
from ceilometer.openstack.common import cfg
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import jsonutils
|
||||
from ceilometer.openstack.common import local
|
||||
from ceilometer.openstack.common import notifier
|
||||
@ -65,13 +66,13 @@ log_opts = [
|
||||
help='prefix each line of exception output with this format'),
|
||||
cfg.ListOpt('default_log_levels',
|
||||
default=[
|
||||
'amqplib=WARN',
|
||||
'sqlalchemy=WARN',
|
||||
'boto=WARN',
|
||||
'suds=INFO',
|
||||
'keystone=INFO',
|
||||
'eventlet.wsgi.server=WARN'
|
||||
],
|
||||
'amqplib=WARN',
|
||||
'sqlalchemy=WARN',
|
||||
'boto=WARN',
|
||||
'suds=INFO',
|
||||
'keystone=INFO',
|
||||
'eventlet.wsgi.server=WARN'
|
||||
],
|
||||
help='list of logger=LEVEL pairs'),
|
||||
cfg.BoolOpt('publish_errors',
|
||||
default=False,
|
||||
@ -88,7 +89,7 @@ log_opts = [
|
||||
default='[instance: %(uuid)s] ',
|
||||
help='If an instance UUID is passed with the log message, '
|
||||
'format it like this'),
|
||||
]
|
||||
]
|
||||
|
||||
|
||||
generic_log_opts = [
|
||||
@ -104,7 +105,7 @@ generic_log_opts = [
|
||||
cfg.StrOpt('logfile_mode',
|
||||
default='0644',
|
||||
help='Default file mode used when creating log files'),
|
||||
]
|
||||
]
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -207,9 +208,9 @@ class JSONFormatter(logging.Formatter):
|
||||
def formatException(self, ei, strip_newlines=True):
|
||||
lines = traceback.format_exception(*ei)
|
||||
if strip_newlines:
|
||||
lines = [itertools.ifilter(lambda x: x,
|
||||
line.rstrip().splitlines())
|
||||
for line in lines]
|
||||
lines = [itertools.ifilter(
|
||||
lambda x: x,
|
||||
line.rstrip().splitlines()) for line in lines]
|
||||
lines = list(itertools.chain(*lines))
|
||||
return lines
|
||||
|
||||
@ -246,26 +247,27 @@ class JSONFormatter(logging.Formatter):
|
||||
|
||||
class PublishErrorsHandler(logging.Handler):
|
||||
def emit(self, record):
|
||||
if 'list_notifier_drivers' in CONF:
|
||||
if ('ceilometer.openstack.common.notifier.log_notifier' in
|
||||
CONF.list_notifier_drivers):
|
||||
return
|
||||
if ('ceilometer.openstack.common.notifier.log_notifier' in
|
||||
CONF.notification_driver):
|
||||
return
|
||||
notifier.api.notify(None, 'error.publisher',
|
||||
'error_notification',
|
||||
notifier.api.ERROR,
|
||||
dict(error=record.msg))
|
||||
'error_notification',
|
||||
notifier.api.ERROR,
|
||||
dict(error=record.msg))
|
||||
|
||||
|
||||
def handle_exception(type, value, tb):
|
||||
extra = {}
|
||||
if CONF.verbose:
|
||||
extra['exc_info'] = (type, value, tb)
|
||||
getLogger().critical(str(value), **extra)
|
||||
def _create_logging_excepthook(product_name):
|
||||
def logging_excepthook(type, value, tb):
|
||||
extra = {}
|
||||
if CONF.verbose:
|
||||
extra['exc_info'] = (type, value, tb)
|
||||
getLogger(product_name).critical(str(value), **extra)
|
||||
return logging_excepthook
|
||||
|
||||
|
||||
def setup(product_name):
|
||||
"""Setup logging."""
|
||||
sys.excepthook = handle_exception
|
||||
sys.excepthook = _create_logging_excepthook(product_name)
|
||||
|
||||
if CONF.log_config:
|
||||
try:
|
||||
@ -356,17 +358,6 @@ def _setup_logging_from_conf(product_name):
|
||||
for handler in log_root.handlers:
|
||||
logger.addHandler(handler)
|
||||
|
||||
# NOTE(jkoelker) Clear the handlers for the root logger that was setup
|
||||
# by basicConfig in nova/__init__.py and install the
|
||||
# NullHandler.
|
||||
root = logging.getLogger()
|
||||
for handler in root.handlers:
|
||||
root.removeHandler(handler)
|
||||
handler = NullHandler()
|
||||
handler.setFormatter(logging.Formatter())
|
||||
root.addHandler(handler)
|
||||
|
||||
|
||||
_loggers = {}
|
||||
|
||||
|
||||
@ -404,8 +395,12 @@ class LegacyFormatter(logging.Formatter):
|
||||
|
||||
def format(self, record):
|
||||
"""Uses contextstring if request_id is set, otherwise default."""
|
||||
if 'instance' not in record.__dict__:
|
||||
record.__dict__['instance'] = ''
|
||||
# NOTE(sdague): default the fancier formating params
|
||||
# to an empty string so we don't throw an exception if
|
||||
# they get used
|
||||
for key in ('instance', 'color'):
|
||||
if key not in record.__dict__:
|
||||
record.__dict__[key] = ''
|
||||
|
||||
if record.__dict__.get('request_id', None):
|
||||
self._fmt = CONF.logging_context_format_string
|
||||
|
@ -18,6 +18,7 @@ import uuid
|
||||
|
||||
from ceilometer.openstack.common import cfg
|
||||
from ceilometer.openstack.common import context
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import importutils
|
||||
from ceilometer.openstack.common import jsonutils
|
||||
from ceilometer.openstack.common import log as logging
|
||||
@ -27,16 +28,17 @@ from ceilometer.openstack.common import timeutils
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
notifier_opts = [
|
||||
cfg.StrOpt('notification_driver',
|
||||
default='ceilometer.openstack.common.notifier.no_op_notifier',
|
||||
help='Default driver for sending notifications'),
|
||||
cfg.MultiStrOpt('notification_driver',
|
||||
default=[],
|
||||
deprecated_name='list_notifier_drivers',
|
||||
help='Driver or drivers to handle sending notifications'),
|
||||
cfg.StrOpt('default_notification_level',
|
||||
default='INFO',
|
||||
help='Default notification level for outgoing notifications'),
|
||||
cfg.StrOpt('default_publisher_id',
|
||||
default='$host',
|
||||
help='Default publisher_id for outgoing notifications'),
|
||||
]
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(notifier_opts)
|
||||
@ -121,21 +123,60 @@ def notify(context, publisher_id, event_type, priority, payload):
|
||||
"""
|
||||
if priority not in log_levels:
|
||||
raise BadPriorityException(
|
||||
_('%s not in valid priorities') % priority)
|
||||
_('%s not in valid priorities') % priority)
|
||||
|
||||
# Ensure everything is JSON serializable.
|
||||
payload = jsonutils.to_primitive(payload, convert_instances=True)
|
||||
|
||||
driver = importutils.import_module(CONF.notification_driver)
|
||||
msg = dict(message_id=str(uuid.uuid4()),
|
||||
publisher_id=publisher_id,
|
||||
event_type=event_type,
|
||||
priority=priority,
|
||||
payload=payload,
|
||||
timestamp=str(timeutils.utcnow()))
|
||||
try:
|
||||
driver.notify(context, msg)
|
||||
except Exception, e:
|
||||
LOG.exception(_("Problem '%(e)s' attempting to "
|
||||
"send to notification system. Payload=%(payload)s") %
|
||||
locals())
|
||||
publisher_id=publisher_id,
|
||||
event_type=event_type,
|
||||
priority=priority,
|
||||
payload=payload,
|
||||
timestamp=str(timeutils.utcnow()))
|
||||
|
||||
for driver in _get_drivers():
|
||||
try:
|
||||
driver.notify(context, msg)
|
||||
except Exception, e:
|
||||
LOG.exception(_("Problem '%(e)s' attempting to "
|
||||
"send to notification system. Payload=%(payload)s") %
|
||||
locals())
|
||||
|
||||
|
||||
_drivers = None
|
||||
|
||||
|
||||
def _get_drivers():
|
||||
"""Instantiate, cache, and return drivers based on the CONF."""
|
||||
global _drivers
|
||||
if _drivers is None:
|
||||
_drivers = {}
|
||||
for notification_driver in CONF.notification_driver:
|
||||
add_driver(notification_driver)
|
||||
|
||||
return _drivers.values()
|
||||
|
||||
|
||||
def add_driver(notification_driver):
|
||||
"""Add a notification driver at runtime."""
|
||||
# Make sure the driver list is initialized.
|
||||
_get_drivers()
|
||||
if isinstance(notification_driver, basestring):
|
||||
# Load and add
|
||||
try:
|
||||
driver = importutils.import_module(notification_driver)
|
||||
_drivers[notification_driver] = driver
|
||||
except ImportError as e:
|
||||
LOG.exception(_("Failed to load notifier %s. "
|
||||
"These notifications will not be sent.") %
|
||||
notification_driver)
|
||||
else:
|
||||
# Driver is already loaded; just add the object.
|
||||
_drivers[notification_driver] = notification_driver
|
||||
|
||||
|
||||
def _reset_drivers():
|
||||
"""Used by unit tests to reset the drivers."""
|
||||
global _drivers
|
||||
_drivers = None
|
||||
|
@ -14,13 +14,15 @@
|
||||
# under the License.
|
||||
|
||||
from ceilometer.openstack.common import cfg
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import importutils
|
||||
from ceilometer.openstack.common import log as logging
|
||||
|
||||
|
||||
list_notifier_drivers_opt = cfg.MultiStrOpt('list_notifier_drivers',
|
||||
default=['ceilometer.openstack.common.notifier.no_op_notifier'],
|
||||
help='List of drivers to send notifications')
|
||||
list_notifier_drivers_opt = cfg.MultiStrOpt(
|
||||
'list_notifier_drivers',
|
||||
default=['ceilometer.openstack.common.notifier.no_op_notifier'],
|
||||
help='List of drivers to send notifications')
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opt(list_notifier_drivers_opt)
|
||||
|
@ -30,6 +30,6 @@ def notify(_context, message):
|
||||
CONF.default_notification_level)
|
||||
priority = priority.lower()
|
||||
logger = logging.getLogger(
|
||||
'ceilometer.openstack.common.notification.%s'
|
||||
% message['event_type'])
|
||||
'ceilometer.openstack.common.notification.%s' %
|
||||
message['event_type'])
|
||||
getattr(logger, priority)(jsonutils.dumps(message))
|
||||
|
@ -16,14 +16,15 @@
|
||||
|
||||
from ceilometer.openstack.common import cfg
|
||||
from ceilometer.openstack.common import context as req_context
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import log as logging
|
||||
from ceilometer.openstack.common import rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
notification_topic_opt = cfg.ListOpt('notification_topics',
|
||||
default=['notifications', ],
|
||||
help='AMQP topic used for openstack notifications')
|
||||
notification_topic_opt = cfg.ListOpt(
|
||||
'notification_topics', default=['notifications', ],
|
||||
help='AMQP topic used for openstack notifications')
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opt(notification_topic_opt)
|
||||
|
@ -48,7 +48,8 @@ rpc_opts = [
|
||||
'Only supported by impl_zmq.'),
|
||||
cfg.ListOpt('allowed_rpc_exception_modules',
|
||||
default=['ceilometer.openstack.common.exception',
|
||||
'nova.exception'],
|
||||
'nova.exception',
|
||||
],
|
||||
help='Modules of exceptions that are permitted to be recreated'
|
||||
'upon receiving exception data from an rpc call.'),
|
||||
cfg.StrOpt('control_exchange',
|
||||
|
@ -35,6 +35,7 @@ from eventlet import pools
|
||||
from eventlet import semaphore
|
||||
|
||||
from ceilometer.openstack.common import excutils
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import local
|
||||
from ceilometer.openstack.common.rpc import common as rpc_common
|
||||
|
||||
|
@ -23,10 +23,10 @@ import sys
|
||||
import traceback
|
||||
|
||||
from ceilometer.openstack.common import cfg
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import importutils
|
||||
from ceilometer.openstack.common import jsonutils
|
||||
from ceilometer.openstack.common import local
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -108,7 +108,7 @@ class Connection(object):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_consumer(self, conf, topic, proxy, fanout=False):
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
"""Create a consumer on this connection.
|
||||
|
||||
A consumer is associated with a message queue on the backend message
|
||||
@ -117,7 +117,6 @@ class Connection(object):
|
||||
off of the queue will determine which method gets called on the proxy
|
||||
object.
|
||||
|
||||
:param conf: An openstack.common.cfg configuration object.
|
||||
:param topic: This is a name associated with what to consume from.
|
||||
Multiple instances of a service may consume from the same
|
||||
topic. For example, all instances of nova-compute consume
|
||||
@ -133,7 +132,7 @@ class Connection(object):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_worker(self, conf, topic, proxy, pool_name):
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
"""Create a worker on this connection.
|
||||
|
||||
A worker is like a regular consumer of messages directed to a
|
||||
@ -143,7 +142,6 @@ class Connection(object):
|
||||
be asked to process it. Load is distributed across the members
|
||||
of the pool in round-robin fashion.
|
||||
|
||||
:param conf: An openstack.common.cfg configuration object.
|
||||
:param topic: This is a name associated with what to consume from.
|
||||
Multiple instances of a service may consume from the same
|
||||
topic.
|
||||
|
@ -40,6 +40,45 @@ The conversion over to a versioned API must be done on both the client side and
|
||||
server side of the API at the same time. However, as the code stands today,
|
||||
there can be both versioned and unversioned APIs implemented in the same code
|
||||
base.
|
||||
|
||||
|
||||
EXAMPLES:
|
||||
|
||||
Nova was the first project to use versioned rpc APIs. Consider the compute rpc
|
||||
API as an example. The client side is in nova/compute/rpcapi.py and the server
|
||||
side is in nova/compute/manager.py.
|
||||
|
||||
|
||||
Example 1) Adding a new method.
|
||||
|
||||
Adding a new method is a backwards compatible change. It should be added to
|
||||
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
|
||||
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
|
||||
have a specific version specified to indicate the minimum API version that must
|
||||
be implemented for the method to be supported. For example:
|
||||
|
||||
def get_host_uptime(self, ctxt, host):
|
||||
topic = _compute_topic(self.topic, ctxt, host, None)
|
||||
return self.call(ctxt, self.make_msg('get_host_uptime'), topic,
|
||||
version='1.1')
|
||||
|
||||
In this case, version '1.1' is the first version that supported the
|
||||
get_host_uptime() method.
|
||||
|
||||
|
||||
Example 2) Adding a new parameter.
|
||||
|
||||
Adding a new parameter to an rpc method can be made backwards compatible. The
|
||||
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
|
||||
The implementation of the method must not expect the parameter to be present.
|
||||
|
||||
def some_remote_method(self, arg1, arg2, newarg=None):
|
||||
# The code needs to deal with newarg=None for cases
|
||||
# where an older client sends a message without it.
|
||||
pass
|
||||
|
||||
On the client side, the same changes should be made as in example 1. The
|
||||
minimum version that supports the new parameter should be specified.
|
||||
"""
|
||||
|
||||
from ceilometer.openstack.common.rpc import common as rpc_common
|
||||
|
@ -17,24 +17,18 @@
|
||||
|
||||
import functools
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import eventlet
|
||||
import greenlet
|
||||
try:
|
||||
import qpid.messaging
|
||||
import qpid.messaging.exceptions
|
||||
except ImportError:
|
||||
# FIXME(dhellmann): Trying to prevent errors
|
||||
# running the tests on stackforge, where qpid
|
||||
# is not installed.
|
||||
pass
|
||||
import qpid.messaging
|
||||
import qpid.messaging.exceptions
|
||||
|
||||
from ceilometer.openstack.common import cfg
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import jsonutils
|
||||
from ceilometer.openstack.common.rpc import amqp as rpc_amqp
|
||||
from ceilometer.openstack.common.rpc import common as rpc_common
|
||||
|
||||
@ -131,7 +125,7 @@ class ConsumerBase(object):
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
addr_opts["link"]["x-declare"].update(link_opts)
|
||||
|
||||
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
|
||||
self.reconnect(session)
|
||||
|
||||
@ -147,7 +141,7 @@ class ConsumerBase(object):
|
||||
try:
|
||||
self.callback(message.content)
|
||||
except Exception:
|
||||
logging.exception(_("Failed to process message... skipping it."))
|
||||
LOG.exception(_("Failed to process message... skipping it."))
|
||||
finally:
|
||||
self.session.acknowledge(message)
|
||||
|
||||
@ -236,7 +230,7 @@ class Publisher(object):
|
||||
if node_opts:
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
|
||||
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
|
||||
self.reconnect(session)
|
||||
|
||||
@ -335,7 +329,7 @@ class Connection(object):
|
||||
if self.conf.qpid_reconnect_interval:
|
||||
self.connection.reconnect_interval = (
|
||||
self.conf.qpid_reconnect_interval)
|
||||
self.connection.hearbeat = self.conf.qpid_heartbeat
|
||||
self.connection.heartbeat = self.conf.qpid_heartbeat
|
||||
self.connection.protocol = self.conf.qpid_protocol
|
||||
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
||||
|
||||
|
@ -47,10 +47,12 @@ zmq_opts = [
|
||||
'address.'),
|
||||
|
||||
# The module.Class to use for matchmaking.
|
||||
cfg.StrOpt('rpc_zmq_matchmaker',
|
||||
default='ceilometer.'
|
||||
'openstack.common.rpc.matchmaker.MatchMakerLocalhost',
|
||||
help='MatchMaker driver'),
|
||||
cfg.StrOpt(
|
||||
'rpc_zmq_matchmaker',
|
||||
default=('ceilometer.openstack.common.rpc.'
|
||||
'matchmaker.MatchMakerLocalhost'),
|
||||
help='MatchMaker driver',
|
||||
),
|
||||
|
||||
# The following port is unassigned by IANA as of 2012-05-21
|
||||
cfg.IntOpt('rpc_zmq_port', default=9501,
|
||||
@ -61,9 +63,10 @@ zmq_opts = [
|
||||
|
||||
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
|
||||
help='Directory for holding IPC sockets'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
|
||||
help='Name of this node. Must be a valid hostname, FQDN, or '
|
||||
'IP address')
|
||||
'IP address. Must match "host" option, if running Nova.')
|
||||
]
|
||||
|
||||
|
||||
@ -128,7 +131,7 @@ class ZmqSocket(object):
|
||||
'subscribe': subscribe, 'bind': bind}
|
||||
|
||||
LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
|
||||
LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
|
||||
LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
|
||||
LOG.debug(_("-> bind: %(bind)s"), str_data)
|
||||
|
||||
try:
|
||||
@ -717,3 +720,6 @@ def register_opts(conf):
|
||||
mm_impl = importutils.import_module(mm_module)
|
||||
mm_constructor = getattr(mm_impl, mm_class)
|
||||
matchmaker = mm_constructor()
|
||||
|
||||
|
||||
register_opts(cfg.CONF)
|
||||
|
@ -24,6 +24,7 @@ import json
|
||||
import logging
|
||||
|
||||
from ceilometer.openstack.common import cfg
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
|
||||
|
||||
matchmaker_opts = [
|
||||
|
@ -112,11 +112,12 @@ class RpcProxy(object):
|
||||
self._set_version(msg, version)
|
||||
rpc.cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast(self, context, msg, version=None):
|
||||
def fanout_cast(self, context, msg, topic=None, version=None):
|
||||
"""rpc.fanout_cast() a remote method.
|
||||
|
||||
:param context: The request context
|
||||
:param msg: The message to send, including the method and args.
|
||||
:param topic: Override the topic for this message.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
|
||||
@ -124,7 +125,7 @@ class RpcProxy(object):
|
||||
from the remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
rpc.fanout_cast(context, self.topic, msg)
|
||||
rpc.fanout_cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def cast_to_server(self, context, server_params, msg, topic=None,
|
||||
version=None):
|
||||
@ -144,13 +145,15 @@ class RpcProxy(object):
|
||||
self._set_version(msg, version)
|
||||
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast_to_server(self, context, server_params, msg, version=None):
|
||||
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
|
||||
version=None):
|
||||
"""rpc.fanout_cast_to_server() a remote method.
|
||||
|
||||
:param context: The request context
|
||||
:param server_params: Server parameters. See rpc.cast_to_server() for
|
||||
details.
|
||||
:param msg: The message to send, including the method and args.
|
||||
:param topic: Override the topic for this message.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
|
||||
@ -158,4 +161,5 @@ class RpcProxy(object):
|
||||
return values.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
|
||||
rpc.fanout_cast_to_server(context, server_params,
|
||||
self._get_topic(topic), msg)
|
||||
|
@ -21,7 +21,6 @@ Time related utilities and helper functions.
|
||||
|
||||
import calendar
|
||||
import datetime
|
||||
import time
|
||||
|
||||
import iso8601
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user