docstring cleanup, nova dir
This commit is contained in:
@@ -18,14 +18,14 @@
|
|||||||
|
|
||||||
"""Super simple fake memcache client."""
|
"""Super simple fake memcache client."""
|
||||||
|
|
||||||
import utils
|
from nova import utils
|
||||||
|
|
||||||
|
|
||||||
class Client(object):
|
class Client(object):
|
||||||
"""Replicates a tiny subset of memcached client interface."""
|
"""Replicates a tiny subset of memcached client interface."""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
"""Ignores the passed in args"""
|
"""Ignores the passed in args."""
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
|
|
||||||
def get(self, key):
|
def get(self, key):
|
||||||
|
|||||||
@@ -16,9 +16,13 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""
|
"""Command-line flag library.
|
||||||
|
|
||||||
|
Wraps gflags.
|
||||||
|
|
||||||
Package-level global flags are defined here, the rest are defined
|
Package-level global flags are defined here, the rest are defined
|
||||||
where they're used.
|
where they're used.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import getopt
|
import getopt
|
||||||
@@ -145,10 +149,12 @@ class FlagValues(gflags.FlagValues):
|
|||||||
|
|
||||||
|
|
||||||
class StrWrapper(object):
|
class StrWrapper(object):
|
||||||
"""Wrapper around FlagValues objects
|
"""Wrapper around FlagValues objects.
|
||||||
|
|
||||||
Wraps FlagValues objects for string.Template so that we're
|
Wraps FlagValues objects for string.Template so that we're
|
||||||
sure to return strings."""
|
sure to return strings.
|
||||||
|
|
||||||
|
"""
|
||||||
def __init__(self, context_objs):
|
def __init__(self, context_objs):
|
||||||
self.context_objs = context_objs
|
self.context_objs = context_objs
|
||||||
|
|
||||||
@@ -169,6 +175,7 @@ def _GetCallingModule():
|
|||||||
|
|
||||||
We generally use this function to get the name of the module calling a
|
We generally use this function to get the name of the module calling a
|
||||||
DEFINE_foo... function.
|
DEFINE_foo... function.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# Walk down the stack to find the first globals dict that's not ours.
|
# Walk down the stack to find the first globals dict that's not ours.
|
||||||
for depth in range(1, sys.getrecursionlimit()):
|
for depth in range(1, sys.getrecursionlimit()):
|
||||||
@@ -192,6 +199,7 @@ def __GetModuleName(globals_dict):
|
|||||||
Returns:
|
Returns:
|
||||||
A string (the name of the module) or None (if the module could not
|
A string (the name of the module) or None (if the module could not
|
||||||
be identified.
|
be identified.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
for name, module in sys.modules.iteritems():
|
for name, module in sys.modules.iteritems():
|
||||||
if getattr(module, '__dict__', None) is globals_dict:
|
if getattr(module, '__dict__', None) is globals_dict:
|
||||||
@@ -326,7 +334,7 @@ DEFINE_integer('auth_token_ttl', 3600, 'Seconds for auth tokens to linger')
|
|||||||
DEFINE_string('state_path', os.path.join(os.path.dirname(__file__), '../'),
|
DEFINE_string('state_path', os.path.join(os.path.dirname(__file__), '../'),
|
||||||
"Top-level directory for maintaining nova's state")
|
"Top-level directory for maintaining nova's state")
|
||||||
DEFINE_string('lock_path', os.path.join(os.path.dirname(__file__), '../'),
|
DEFINE_string('lock_path', os.path.join(os.path.dirname(__file__), '../'),
|
||||||
"Directory for lock files")
|
'Directory for lock files')
|
||||||
DEFINE_string('logdir', None, 'output to a per-service log file in named '
|
DEFINE_string('logdir', None, 'output to a per-service log file in named '
|
||||||
'directory')
|
'directory')
|
||||||
|
|
||||||
|
|||||||
49
nova/log.py
49
nova/log.py
@@ -16,16 +16,15 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""
|
"""Nova logging handler.
|
||||||
Nova logging handler.
|
|
||||||
|
|
||||||
This module adds to logging functionality by adding the option to specify
|
This module adds to logging functionality by adding the option to specify
|
||||||
a context object when calling the various log methods. If the context object
|
a context object when calling the various log methods. If the context object
|
||||||
is not specified, default formatting is used.
|
is not specified, default formatting is used.
|
||||||
|
|
||||||
It also allows setting of formatting information through flags.
|
It also allows setting of formatting information through flags.
|
||||||
"""
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
import cStringIO
|
import cStringIO
|
||||||
import inspect
|
import inspect
|
||||||
@@ -41,34 +40,28 @@ from nova import version
|
|||||||
|
|
||||||
|
|
||||||
FLAGS = flags.FLAGS
|
FLAGS = flags.FLAGS
|
||||||
|
|
||||||
flags.DEFINE_string('logging_context_format_string',
|
flags.DEFINE_string('logging_context_format_string',
|
||||||
'%(asctime)s %(levelname)s %(name)s '
|
'%(asctime)s %(levelname)s %(name)s '
|
||||||
'[%(request_id)s %(user)s '
|
'[%(request_id)s %(user)s '
|
||||||
'%(project)s] %(message)s',
|
'%(project)s] %(message)s',
|
||||||
'format string to use for log messages with context')
|
'format string to use for log messages with context')
|
||||||
|
|
||||||
flags.DEFINE_string('logging_default_format_string',
|
flags.DEFINE_string('logging_default_format_string',
|
||||||
'%(asctime)s %(levelname)s %(name)s [-] '
|
'%(asctime)s %(levelname)s %(name)s [-] '
|
||||||
'%(message)s',
|
'%(message)s',
|
||||||
'format string to use for log messages without context')
|
'format string to use for log messages without context')
|
||||||
|
|
||||||
flags.DEFINE_string('logging_debug_format_suffix',
|
flags.DEFINE_string('logging_debug_format_suffix',
|
||||||
'from (pid=%(process)d) %(funcName)s'
|
'from (pid=%(process)d) %(funcName)s'
|
||||||
' %(pathname)s:%(lineno)d',
|
' %(pathname)s:%(lineno)d',
|
||||||
'data to append to log format when level is DEBUG')
|
'data to append to log format when level is DEBUG')
|
||||||
|
|
||||||
flags.DEFINE_string('logging_exception_prefix',
|
flags.DEFINE_string('logging_exception_prefix',
|
||||||
'(%(name)s): TRACE: ',
|
'(%(name)s): TRACE: ',
|
||||||
'prefix each line of exception output with this format')
|
'prefix each line of exception output with this format')
|
||||||
|
|
||||||
flags.DEFINE_list('default_log_levels',
|
flags.DEFINE_list('default_log_levels',
|
||||||
['amqplib=WARN',
|
['amqplib=WARN',
|
||||||
'sqlalchemy=WARN',
|
'sqlalchemy=WARN',
|
||||||
'boto=WARN',
|
'boto=WARN',
|
||||||
'eventlet.wsgi.server=WARN'],
|
'eventlet.wsgi.server=WARN'],
|
||||||
'list of logger=LEVEL pairs')
|
'list of logger=LEVEL pairs')
|
||||||
|
|
||||||
flags.DEFINE_bool('use_syslog', False, 'output to syslog')
|
flags.DEFINE_bool('use_syslog', False, 'output to syslog')
|
||||||
flags.DEFINE_string('logfile', None, 'output to named file')
|
flags.DEFINE_string('logfile', None, 'output to named file')
|
||||||
|
|
||||||
@@ -83,6 +76,8 @@ WARN = logging.WARN
|
|||||||
INFO = logging.INFO
|
INFO = logging.INFO
|
||||||
DEBUG = logging.DEBUG
|
DEBUG = logging.DEBUG
|
||||||
NOTSET = logging.NOTSET
|
NOTSET = logging.NOTSET
|
||||||
|
|
||||||
|
|
||||||
# methods
|
# methods
|
||||||
getLogger = logging.getLogger
|
getLogger = logging.getLogger
|
||||||
debug = logging.debug
|
debug = logging.debug
|
||||||
@@ -93,6 +88,8 @@ error = logging.error
|
|||||||
exception = logging.exception
|
exception = logging.exception
|
||||||
critical = logging.critical
|
critical = logging.critical
|
||||||
log = logging.log
|
log = logging.log
|
||||||
|
|
||||||
|
|
||||||
# handlers
|
# handlers
|
||||||
StreamHandler = logging.StreamHandler
|
StreamHandler = logging.StreamHandler
|
||||||
WatchedFileHandler = logging.handlers.WatchedFileHandler
|
WatchedFileHandler = logging.handlers.WatchedFileHandler
|
||||||
@@ -127,17 +124,18 @@ def _get_log_file_path(binary=None):
|
|||||||
|
|
||||||
|
|
||||||
class NovaLogger(logging.Logger):
|
class NovaLogger(logging.Logger):
|
||||||
"""
|
"""NovaLogger manages request context and formatting.
|
||||||
NovaLogger manages request context and formatting.
|
|
||||||
|
|
||||||
This becomes the class that is instanciated by logging.getLogger.
|
This becomes the class that is instanciated by logging.getLogger.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, name, level=NOTSET):
|
def __init__(self, name, level=NOTSET):
|
||||||
logging.Logger.__init__(self, name, level)
|
logging.Logger.__init__(self, name, level)
|
||||||
self.setup_from_flags()
|
self.setup_from_flags()
|
||||||
|
|
||||||
def setup_from_flags(self):
|
def setup_from_flags(self):
|
||||||
"""Setup logger from flags"""
|
"""Setup logger from flags."""
|
||||||
level = NOTSET
|
level = NOTSET
|
||||||
for pair in FLAGS.default_log_levels:
|
for pair in FLAGS.default_log_levels:
|
||||||
logger, _sep, level_name = pair.partition('=')
|
logger, _sep, level_name = pair.partition('=')
|
||||||
@@ -148,7 +146,7 @@ class NovaLogger(logging.Logger):
|
|||||||
self.setLevel(level)
|
self.setLevel(level)
|
||||||
|
|
||||||
def _log(self, level, msg, args, exc_info=None, extra=None, context=None):
|
def _log(self, level, msg, args, exc_info=None, extra=None, context=None):
|
||||||
"""Extract context from any log call"""
|
"""Extract context from any log call."""
|
||||||
if not extra:
|
if not extra:
|
||||||
extra = {}
|
extra = {}
|
||||||
if context:
|
if context:
|
||||||
@@ -157,17 +155,17 @@ class NovaLogger(logging.Logger):
|
|||||||
return logging.Logger._log(self, level, msg, args, exc_info, extra)
|
return logging.Logger._log(self, level, msg, args, exc_info, extra)
|
||||||
|
|
||||||
def addHandler(self, handler):
|
def addHandler(self, handler):
|
||||||
"""Each handler gets our custom formatter"""
|
"""Each handler gets our custom formatter."""
|
||||||
handler.setFormatter(_formatter)
|
handler.setFormatter(_formatter)
|
||||||
return logging.Logger.addHandler(self, handler)
|
return logging.Logger.addHandler(self, handler)
|
||||||
|
|
||||||
def audit(self, msg, *args, **kwargs):
|
def audit(self, msg, *args, **kwargs):
|
||||||
"""Shortcut for our AUDIT level"""
|
"""Shortcut for our AUDIT level."""
|
||||||
if self.isEnabledFor(AUDIT):
|
if self.isEnabledFor(AUDIT):
|
||||||
self._log(AUDIT, msg, args, **kwargs)
|
self._log(AUDIT, msg, args, **kwargs)
|
||||||
|
|
||||||
def exception(self, msg, *args, **kwargs):
|
def exception(self, msg, *args, **kwargs):
|
||||||
"""Logging.exception doesn't handle kwargs, so breaks context"""
|
"""Logging.exception doesn't handle kwargs, so breaks context."""
|
||||||
if not kwargs.get('exc_info'):
|
if not kwargs.get('exc_info'):
|
||||||
kwargs['exc_info'] = 1
|
kwargs['exc_info'] = 1
|
||||||
self.error(msg, *args, **kwargs)
|
self.error(msg, *args, **kwargs)
|
||||||
@@ -181,14 +179,13 @@ class NovaLogger(logging.Logger):
|
|||||||
for k in env.keys():
|
for k in env.keys():
|
||||||
if not isinstance(env[k], str):
|
if not isinstance(env[k], str):
|
||||||
env.pop(k)
|
env.pop(k)
|
||||||
message = "Environment: %s" % json.dumps(env)
|
message = 'Environment: %s' % json.dumps(env)
|
||||||
kwargs.pop('exc_info')
|
kwargs.pop('exc_info')
|
||||||
self.error(message, **kwargs)
|
self.error(message, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class NovaFormatter(logging.Formatter):
|
class NovaFormatter(logging.Formatter):
|
||||||
"""
|
"""A nova.context.RequestContext aware formatter configured through flags.
|
||||||
A nova.context.RequestContext aware formatter configured through flags.
|
|
||||||
|
|
||||||
The flags used to set format strings are: logging_context_foramt_string
|
The flags used to set format strings are: logging_context_foramt_string
|
||||||
and logging_default_format_string. You can also specify
|
and logging_default_format_string. You can also specify
|
||||||
@@ -197,10 +194,11 @@ class NovaFormatter(logging.Formatter):
|
|||||||
|
|
||||||
For information about what variables are available for the formatter see:
|
For information about what variables are available for the formatter see:
|
||||||
http://docs.python.org/library/logging.html#formatter
|
http://docs.python.org/library/logging.html#formatter
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def format(self, record):
|
def format(self, record):
|
||||||
"""Uses contextstring if request_id is set, otherwise default"""
|
"""Uses contextstring if request_id is set, otherwise default."""
|
||||||
if record.__dict__.get('request_id', None):
|
if record.__dict__.get('request_id', None):
|
||||||
self._fmt = FLAGS.logging_context_format_string
|
self._fmt = FLAGS.logging_context_format_string
|
||||||
else:
|
else:
|
||||||
@@ -214,20 +212,21 @@ class NovaFormatter(logging.Formatter):
|
|||||||
return logging.Formatter.format(self, record)
|
return logging.Formatter.format(self, record)
|
||||||
|
|
||||||
def formatException(self, exc_info, record=None):
|
def formatException(self, exc_info, record=None):
|
||||||
"""Format exception output with FLAGS.logging_exception_prefix"""
|
"""Format exception output with FLAGS.logging_exception_prefix."""
|
||||||
if not record:
|
if not record:
|
||||||
return logging.Formatter.formatException(self, exc_info)
|
return logging.Formatter.formatException(self, exc_info)
|
||||||
stringbuffer = cStringIO.StringIO()
|
stringbuffer = cStringIO.StringIO()
|
||||||
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
|
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
|
||||||
None, stringbuffer)
|
None, stringbuffer)
|
||||||
lines = stringbuffer.getvalue().split("\n")
|
lines = stringbuffer.getvalue().split('\n')
|
||||||
stringbuffer.close()
|
stringbuffer.close()
|
||||||
formatted_lines = []
|
formatted_lines = []
|
||||||
for line in lines:
|
for line in lines:
|
||||||
pl = FLAGS.logging_exception_prefix % record.__dict__
|
pl = FLAGS.logging_exception_prefix % record.__dict__
|
||||||
fl = "%s%s" % (pl, line)
|
fl = '%s%s' % (pl, line)
|
||||||
formatted_lines.append(fl)
|
formatted_lines.append(fl)
|
||||||
return "\n".join(formatted_lines)
|
return '\n'.join(formatted_lines)
|
||||||
|
|
||||||
|
|
||||||
_formatter = NovaFormatter()
|
_formatter = NovaFormatter()
|
||||||
|
|
||||||
@@ -241,7 +240,7 @@ class NovaRootLogger(NovaLogger):
|
|||||||
NovaLogger.__init__(self, name, level)
|
NovaLogger.__init__(self, name, level)
|
||||||
|
|
||||||
def setup_from_flags(self):
|
def setup_from_flags(self):
|
||||||
"""Setup logger from flags"""
|
"""Setup logger from flags."""
|
||||||
global _filelog
|
global _filelog
|
||||||
if FLAGS.use_syslog:
|
if FLAGS.use_syslog:
|
||||||
self.syslog = SysLogHandler(address='/dev/log')
|
self.syslog = SysLogHandler(address='/dev/log')
|
||||||
|
|||||||
152
nova/rpc.py
152
nova/rpc.py
@@ -16,9 +16,12 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""
|
"""AMQP-based RPC.
|
||||||
AMQP-based RPC. Queues have consumers and publishers.
|
|
||||||
|
Queues have consumers and publishers.
|
||||||
|
|
||||||
No fan-out support yet.
|
No fan-out support yet.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
@@ -40,17 +43,19 @@ from nova import log as logging
|
|||||||
from nova import utils
|
from nova import utils
|
||||||
|
|
||||||
|
|
||||||
FLAGS = flags.FLAGS
|
|
||||||
LOG = logging.getLogger('nova.rpc')
|
LOG = logging.getLogger('nova.rpc')
|
||||||
|
|
||||||
|
|
||||||
|
FLAGS = flags.FLAGS
|
||||||
flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
|
flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
|
||||||
|
|
||||||
|
|
||||||
class Connection(carrot_connection.BrokerConnection):
|
class Connection(carrot_connection.BrokerConnection):
|
||||||
"""Connection instance object"""
|
"""Connection instance object."""
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def instance(cls, new=True):
|
def instance(cls, new=True):
|
||||||
"""Returns the instance"""
|
"""Returns the instance."""
|
||||||
if new or not hasattr(cls, '_instance'):
|
if new or not hasattr(cls, '_instance'):
|
||||||
params = dict(hostname=FLAGS.rabbit_host,
|
params = dict(hostname=FLAGS.rabbit_host,
|
||||||
port=FLAGS.rabbit_port,
|
port=FLAGS.rabbit_port,
|
||||||
@@ -71,9 +76,11 @@ class Connection(carrot_connection.BrokerConnection):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def recreate(cls):
|
def recreate(cls):
|
||||||
"""Recreates the connection instance
|
"""Recreates the connection instance.
|
||||||
|
|
||||||
This is necessary to recover from some network errors/disconnects"""
|
This is necessary to recover from some network errors/disconnects.
|
||||||
|
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
del cls._instance
|
del cls._instance
|
||||||
except AttributeError, e:
|
except AttributeError, e:
|
||||||
@@ -84,10 +91,12 @@ class Connection(carrot_connection.BrokerConnection):
|
|||||||
|
|
||||||
|
|
||||||
class Consumer(messaging.Consumer):
|
class Consumer(messaging.Consumer):
|
||||||
"""Consumer base class
|
"""Consumer base class.
|
||||||
|
|
||||||
|
Contains methods for connecting the fetch method to async loops.
|
||||||
|
|
||||||
Contains methods for connecting the fetch method to async loops
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
for i in xrange(FLAGS.rabbit_max_retries):
|
for i in xrange(FLAGS.rabbit_max_retries):
|
||||||
if i > 0:
|
if i > 0:
|
||||||
@@ -100,19 +109,18 @@ class Consumer(messaging.Consumer):
|
|||||||
fl_host = FLAGS.rabbit_host
|
fl_host = FLAGS.rabbit_host
|
||||||
fl_port = FLAGS.rabbit_port
|
fl_port = FLAGS.rabbit_port
|
||||||
fl_intv = FLAGS.rabbit_retry_interval
|
fl_intv = FLAGS.rabbit_retry_interval
|
||||||
LOG.error(_("AMQP server on %(fl_host)s:%(fl_port)d is"
|
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
|
||||||
" unreachable: %(e)s. Trying again in %(fl_intv)d"
|
' unreachable: %(e)s. Trying again in %(fl_intv)d'
|
||||||
" seconds.")
|
' seconds.') % locals())
|
||||||
% locals())
|
|
||||||
self.failed_connection = True
|
self.failed_connection = True
|
||||||
if self.failed_connection:
|
if self.failed_connection:
|
||||||
LOG.error(_("Unable to connect to AMQP server "
|
LOG.error(_('Unable to connect to AMQP server '
|
||||||
"after %d tries. Shutting down."),
|
'after %d tries. Shutting down.'),
|
||||||
FLAGS.rabbit_max_retries)
|
FLAGS.rabbit_max_retries)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||||
"""Wraps the parent fetch with some logic for failed connections"""
|
"""Wraps the parent fetch with some logic for failed connection."""
|
||||||
# TODO(vish): the logic for failed connections and logging should be
|
# TODO(vish): the logic for failed connections and logging should be
|
||||||
# refactored into some sort of connection manager object
|
# refactored into some sort of connection manager object
|
||||||
try:
|
try:
|
||||||
@@ -125,14 +133,14 @@ class Consumer(messaging.Consumer):
|
|||||||
self.declare()
|
self.declare()
|
||||||
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
|
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
|
||||||
if self.failed_connection:
|
if self.failed_connection:
|
||||||
LOG.error(_("Reconnected to queue"))
|
LOG.error(_('Reconnected to queue'))
|
||||||
self.failed_connection = False
|
self.failed_connection = False
|
||||||
# NOTE(vish): This is catching all errors because we really don't
|
# NOTE(vish): This is catching all errors because we really don't
|
||||||
# want exceptions to be logged 10 times a second if some
|
# want exceptions to be logged 10 times a second if some
|
||||||
# persistent failure occurs.
|
# persistent failure occurs.
|
||||||
except Exception, e: # pylint: disable=W0703
|
except Exception, e: # pylint: disable=W0703
|
||||||
if not self.failed_connection:
|
if not self.failed_connection:
|
||||||
LOG.exception(_("Failed to fetch message from queue: %s" % e))
|
LOG.exception(_('Failed to fetch message from queue: %s' % e))
|
||||||
self.failed_connection = True
|
self.failed_connection = True
|
||||||
|
|
||||||
def attach_to_eventlet(self):
|
def attach_to_eventlet(self):
|
||||||
@@ -143,8 +151,9 @@ class Consumer(messaging.Consumer):
|
|||||||
|
|
||||||
|
|
||||||
class AdapterConsumer(Consumer):
|
class AdapterConsumer(Consumer):
|
||||||
"""Calls methods on a proxy object based on method and args"""
|
"""Calls methods on a proxy object based on method and args."""
|
||||||
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
|
||||||
|
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||||
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
|
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
|
||||||
self.proxy = proxy
|
self.proxy = proxy
|
||||||
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
|
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
|
||||||
@@ -156,13 +165,14 @@ class AdapterConsumer(Consumer):
|
|||||||
|
|
||||||
@exception.wrap_exception
|
@exception.wrap_exception
|
||||||
def _receive(self, message_data, message):
|
def _receive(self, message_data, message):
|
||||||
"""Magically looks for a method on the proxy object and calls it
|
"""Magically looks for a method on the proxy object and calls it.
|
||||||
|
|
||||||
Message data should be a dictionary with two keys:
|
Message data should be a dictionary with two keys:
|
||||||
method: string representing the method to call
|
method: string representing the method to call
|
||||||
args: dictionary of arg: value
|
args: dictionary of arg: value
|
||||||
|
|
||||||
Example: {'method': 'echo', 'args': {'value': 42}}
|
Example: {'method': 'echo', 'args': {'value': 42}}
|
||||||
|
|
||||||
"""
|
"""
|
||||||
LOG.debug(_('received %s') % message_data)
|
LOG.debug(_('received %s') % message_data)
|
||||||
msg_id = message_data.pop('_msg_id', None)
|
msg_id = message_data.pop('_msg_id', None)
|
||||||
@@ -189,22 +199,23 @@ class AdapterConsumer(Consumer):
|
|||||||
if msg_id:
|
if msg_id:
|
||||||
msg_reply(msg_id, rval, None)
|
msg_reply(msg_id, rval, None)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.exception("Exception during message handling")
|
logging.exception('Exception during message handling')
|
||||||
if msg_id:
|
if msg_id:
|
||||||
msg_reply(msg_id, None, sys.exc_info())
|
msg_reply(msg_id, None, sys.exc_info())
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
class Publisher(messaging.Publisher):
|
class Publisher(messaging.Publisher):
|
||||||
"""Publisher base class"""
|
"""Publisher base class."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TopicAdapterConsumer(AdapterConsumer):
|
class TopicAdapterConsumer(AdapterConsumer):
|
||||||
"""Consumes messages on a specific topic"""
|
"""Consumes messages on a specific topic."""
|
||||||
exchange_type = "topic"
|
|
||||||
|
|
||||||
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
exchange_type = 'topic'
|
||||||
|
|
||||||
|
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||||
self.queue = topic
|
self.queue = topic
|
||||||
self.routing_key = topic
|
self.routing_key = topic
|
||||||
self.exchange = FLAGS.control_exchange
|
self.exchange = FLAGS.control_exchange
|
||||||
@@ -214,27 +225,29 @@ class TopicAdapterConsumer(AdapterConsumer):
|
|||||||
|
|
||||||
|
|
||||||
class FanoutAdapterConsumer(AdapterConsumer):
|
class FanoutAdapterConsumer(AdapterConsumer):
|
||||||
"""Consumes messages from a fanout exchange"""
|
"""Consumes messages from a fanout exchange."""
|
||||||
exchange_type = "fanout"
|
|
||||||
|
|
||||||
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
exchange_type = 'fanout'
|
||||||
self.exchange = "%s_fanout" % topic
|
|
||||||
|
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||||
|
self.exchange = '%s_fanout' % topic
|
||||||
self.routing_key = topic
|
self.routing_key = topic
|
||||||
unique = uuid.uuid4().hex
|
unique = uuid.uuid4().hex
|
||||||
self.queue = "%s_fanout_%s" % (topic, unique)
|
self.queue = '%s_fanout_%s' % (topic, unique)
|
||||||
self.durable = False
|
self.durable = False
|
||||||
LOG.info(_("Created '%(exchange)s' fanout exchange "
|
LOG.info(_('Created "%(exchange)s" fanout exchange '
|
||||||
"with '%(key)s' routing key"),
|
'with "%(key)s" routing key'),
|
||||||
dict(exchange=self.exchange, key=self.routing_key))
|
dict(exchange=self.exchange, key=self.routing_key))
|
||||||
super(FanoutAdapterConsumer, self).__init__(connection=connection,
|
super(FanoutAdapterConsumer, self).__init__(connection=connection,
|
||||||
topic=topic, proxy=proxy)
|
topic=topic, proxy=proxy)
|
||||||
|
|
||||||
|
|
||||||
class TopicPublisher(Publisher):
|
class TopicPublisher(Publisher):
|
||||||
"""Publishes messages on a specific topic"""
|
"""Publishes messages on a specific topic."""
|
||||||
exchange_type = "topic"
|
|
||||||
|
|
||||||
def __init__(self, connection=None, topic="broadcast"):
|
exchange_type = 'topic'
|
||||||
|
|
||||||
|
def __init__(self, connection=None, topic='broadcast'):
|
||||||
self.routing_key = topic
|
self.routing_key = topic
|
||||||
self.exchange = FLAGS.control_exchange
|
self.exchange = FLAGS.control_exchange
|
||||||
self.durable = False
|
self.durable = False
|
||||||
@@ -243,20 +256,22 @@ class TopicPublisher(Publisher):
|
|||||||
|
|
||||||
class FanoutPublisher(Publisher):
|
class FanoutPublisher(Publisher):
|
||||||
"""Publishes messages to a fanout exchange."""
|
"""Publishes messages to a fanout exchange."""
|
||||||
exchange_type = "fanout"
|
|
||||||
|
exchange_type = 'fanout'
|
||||||
|
|
||||||
def __init__(self, topic, connection=None):
|
def __init__(self, topic, connection=None):
|
||||||
self.exchange = "%s_fanout" % topic
|
self.exchange = '%s_fanout' % topic
|
||||||
self.queue = "%s_fanout" % topic
|
self.queue = '%s_fanout' % topic
|
||||||
self.durable = False
|
self.durable = False
|
||||||
LOG.info(_("Creating '%(exchange)s' fanout exchange"),
|
LOG.info(_('Creating "%(exchange)s" fanout exchange'),
|
||||||
dict(exchange=self.exchange))
|
dict(exchange=self.exchange))
|
||||||
super(FanoutPublisher, self).__init__(connection=connection)
|
super(FanoutPublisher, self).__init__(connection=connection)
|
||||||
|
|
||||||
|
|
||||||
class DirectConsumer(Consumer):
|
class DirectConsumer(Consumer):
|
||||||
"""Consumes messages directly on a channel specified by msg_id"""
|
"""Consumes messages directly on a channel specified by msg_id."""
|
||||||
exchange_type = "direct"
|
|
||||||
|
exchange_type = 'direct'
|
||||||
|
|
||||||
def __init__(self, connection=None, msg_id=None):
|
def __init__(self, connection=None, msg_id=None):
|
||||||
self.queue = msg_id
|
self.queue = msg_id
|
||||||
@@ -268,8 +283,9 @@ class DirectConsumer(Consumer):
|
|||||||
|
|
||||||
|
|
||||||
class DirectPublisher(Publisher):
|
class DirectPublisher(Publisher):
|
||||||
"""Publishes messages directly on a channel specified by msg_id"""
|
"""Publishes messages directly on a channel specified by msg_id."""
|
||||||
exchange_type = "direct"
|
|
||||||
|
exchange_type = 'direct'
|
||||||
|
|
||||||
def __init__(self, connection=None, msg_id=None):
|
def __init__(self, connection=None, msg_id=None):
|
||||||
self.routing_key = msg_id
|
self.routing_key = msg_id
|
||||||
@@ -279,9 +295,9 @@ class DirectPublisher(Publisher):
|
|||||||
|
|
||||||
|
|
||||||
def msg_reply(msg_id, reply=None, failure=None):
|
def msg_reply(msg_id, reply=None, failure=None):
|
||||||
"""Sends a reply or an error on the channel signified by msg_id
|
"""Sends a reply or an error on the channel signified by msg_id.
|
||||||
|
|
||||||
failure should be a sys.exc_info() tuple.
|
Failure should be a sys.exc_info() tuple.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if failure:
|
if failure:
|
||||||
@@ -303,17 +319,20 @@ def msg_reply(msg_id, reply=None, failure=None):
|
|||||||
|
|
||||||
|
|
||||||
class RemoteError(exception.Error):
|
class RemoteError(exception.Error):
|
||||||
"""Signifies that a remote class has raised an exception
|
"""Signifies that a remote class has raised an exception.
|
||||||
|
|
||||||
Containes a string representation of the type of the original exception,
|
Containes a string representation of the type of the original exception,
|
||||||
the value of the original exception, and the traceback. These are
|
the value of the original exception, and the traceback. These are
|
||||||
sent to the parent as a joined string so printing the exception
|
sent to the parent as a joined string so printing the exception
|
||||||
contains all of the relevent info."""
|
contains all of the relevent info.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, exc_type, value, traceback):
|
def __init__(self, exc_type, value, traceback):
|
||||||
self.exc_type = exc_type
|
self.exc_type = exc_type
|
||||||
self.value = value
|
self.value = value
|
||||||
self.traceback = traceback
|
self.traceback = traceback
|
||||||
super(RemoteError, self).__init__("%s %s\n%s" % (exc_type,
|
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
|
||||||
value,
|
value,
|
||||||
traceback))
|
traceback))
|
||||||
|
|
||||||
@@ -339,6 +358,7 @@ def _pack_context(msg, context):
|
|||||||
context out into a bunch of separate keys. If we want to support
|
context out into a bunch of separate keys. If we want to support
|
||||||
more arguments in rabbit messages, we may want to do the same
|
more arguments in rabbit messages, we may want to do the same
|
||||||
for args at some point.
|
for args at some point.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
context = dict([('_context_%s' % key, value)
|
context = dict([('_context_%s' % key, value)
|
||||||
for (key, value) in context.to_dict().iteritems()])
|
for (key, value) in context.to_dict().iteritems()])
|
||||||
@@ -346,11 +366,11 @@ def _pack_context(msg, context):
|
|||||||
|
|
||||||
|
|
||||||
def call(context, topic, msg):
|
def call(context, topic, msg):
|
||||||
"""Sends a message on a topic and wait for a response"""
|
"""Sends a message on a topic and wait for a response."""
|
||||||
LOG.debug(_("Making asynchronous call on %s ..."), topic)
|
LOG.debug(_('Making asynchronous call on %s ...'), topic)
|
||||||
msg_id = uuid.uuid4().hex
|
msg_id = uuid.uuid4().hex
|
||||||
msg.update({'_msg_id': msg_id})
|
msg.update({'_msg_id': msg_id})
|
||||||
LOG.debug(_("MSG_ID is %s") % (msg_id))
|
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
||||||
_pack_context(msg, context)
|
_pack_context(msg, context)
|
||||||
|
|
||||||
class WaitMessage(object):
|
class WaitMessage(object):
|
||||||
@@ -387,8 +407,8 @@ def call(context, topic, msg):
|
|||||||
|
|
||||||
|
|
||||||
def cast(context, topic, msg):
|
def cast(context, topic, msg):
|
||||||
"""Sends a message on a topic without waiting for a response"""
|
"""Sends a message on a topic without waiting for a response."""
|
||||||
LOG.debug(_("Making asynchronous cast on %s..."), topic)
|
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
||||||
_pack_context(msg, context)
|
_pack_context(msg, context)
|
||||||
conn = Connection.instance()
|
conn = Connection.instance()
|
||||||
publisher = TopicPublisher(connection=conn, topic=topic)
|
publisher = TopicPublisher(connection=conn, topic=topic)
|
||||||
@@ -397,8 +417,8 @@ def cast(context, topic, msg):
|
|||||||
|
|
||||||
|
|
||||||
def fanout_cast(context, topic, msg):
|
def fanout_cast(context, topic, msg):
|
||||||
"""Sends a message on a fanout exchange without waiting for a response"""
|
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||||
LOG.debug(_("Making asynchronous fanout cast..."))
|
LOG.debug(_('Making asynchronous fanout cast...'))
|
||||||
_pack_context(msg, context)
|
_pack_context(msg, context)
|
||||||
conn = Connection.instance()
|
conn = Connection.instance()
|
||||||
publisher = FanoutPublisher(topic, connection=conn)
|
publisher = FanoutPublisher(topic, connection=conn)
|
||||||
@@ -407,14 +427,14 @@ def fanout_cast(context, topic, msg):
|
|||||||
|
|
||||||
|
|
||||||
def generic_response(message_data, message):
|
def generic_response(message_data, message):
|
||||||
"""Logs a result and exits"""
|
"""Logs a result and exits."""
|
||||||
LOG.debug(_('response %s'), message_data)
|
LOG.debug(_('response %s'), message_data)
|
||||||
message.ack()
|
message.ack()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
def send_message(topic, message, wait=True):
|
def send_message(topic, message, wait=True):
|
||||||
"""Sends a message for testing"""
|
"""Sends a message for testing."""
|
||||||
msg_id = uuid.uuid4().hex
|
msg_id = uuid.uuid4().hex
|
||||||
message.update({'_msg_id': msg_id})
|
message.update({'_msg_id': msg_id})
|
||||||
LOG.debug(_('topic is %s'), topic)
|
LOG.debug(_('topic is %s'), topic)
|
||||||
@@ -425,14 +445,14 @@ def send_message(topic, message, wait=True):
|
|||||||
queue=msg_id,
|
queue=msg_id,
|
||||||
exchange=msg_id,
|
exchange=msg_id,
|
||||||
auto_delete=True,
|
auto_delete=True,
|
||||||
exchange_type="direct",
|
exchange_type='direct',
|
||||||
routing_key=msg_id)
|
routing_key=msg_id)
|
||||||
consumer.register_callback(generic_response)
|
consumer.register_callback(generic_response)
|
||||||
|
|
||||||
publisher = messaging.Publisher(connection=Connection.instance(),
|
publisher = messaging.Publisher(connection=Connection.instance(),
|
||||||
exchange=FLAGS.control_exchange,
|
exchange=FLAGS.control_exchange,
|
||||||
durable=False,
|
durable=False,
|
||||||
exchange_type="topic",
|
exchange_type='topic',
|
||||||
routing_key=topic)
|
routing_key=topic)
|
||||||
publisher.send(message)
|
publisher.send(message)
|
||||||
publisher.close()
|
publisher.close()
|
||||||
@@ -441,8 +461,8 @@ def send_message(topic, message, wait=True):
|
|||||||
consumer.wait()
|
consumer.wait()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
# NOTE(vish): you can send messages from the command line using
|
# You can send messages from the command line using
|
||||||
# topic and a json sting representing a dictionary
|
# topic and a json string representing a dictionary
|
||||||
# for the method
|
# for the method
|
||||||
send_message(sys.argv[1], json.loads(sys.argv[2]))
|
send_message(sys.argv[1], json.loads(sys.argv[2]))
|
||||||
|
|||||||
Reference in New Issue
Block a user