merge trunk, resolved conflict
This commit is contained in:
1
Authors
1
Authors
@@ -30,6 +30,7 @@ Ilya Alekseyev <ialekseev@griddynamics.com>
|
||||
Jason Koelker <jason@koelker.net>
|
||||
Jay Pipes <jaypipes@gmail.com>
|
||||
Jesse Andrews <anotherjesse@gmail.com>
|
||||
Jimmy Bergman <jimmy@sigint.se>
|
||||
Joe Heck <heckj@mac.com>
|
||||
Joel Moore <joelbm24@gmail.com>
|
||||
Johannes Erdfelt <johannes.erdfelt@rackspace.com>
|
||||
|
17
HACKING
17
HACKING
@@ -50,17 +50,24 @@ Human Alphabetical Order Examples
|
||||
|
||||
Docstrings
|
||||
----------
|
||||
"""Summary of the function, class or method, less than 80 characters.
|
||||
"""A one line docstring looks like this and ends in a period."""
|
||||
|
||||
New paragraph after newline that explains in more detail any general
|
||||
information about the function, class or method. After this, if defining
|
||||
parameters and return types use the Sphinx format. After that an extra
|
||||
newline then close the quotations.
|
||||
|
||||
"""A multiline docstring has a one-line summary, less than 80 characters.
|
||||
|
||||
Then a new paragraph after a newline that explains in more detail any
|
||||
general information about the function, class or method. Example usages
|
||||
are also great to have here if it is a complex class for function. After
|
||||
you have finished your descriptions add an extra newline and close the
|
||||
quotations.
|
||||
|
||||
When writing the docstring for a class, an extra line should be placed
|
||||
after the closing quotations. For more in-depth explanations for these
|
||||
decisions see http://www.python.org/dev/peps/pep-0257/
|
||||
|
||||
If you are going to describe parameters and return values, use Sphinx, the
|
||||
appropriate syntax is as follows.
|
||||
|
||||
:param foo: the foo parameter
|
||||
:param bar: the bar parameter
|
||||
:returns: description of the return value
|
||||
|
@@ -28,11 +28,11 @@ import sys
|
||||
|
||||
# If ../nova/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'nova', '__init__.py')):
|
||||
sys.path.insert(0, POSSIBLE_TOPDIR)
|
||||
|
||||
gettext.install('nova', unicode=1)
|
||||
|
||||
|
@@ -58,7 +58,6 @@ import gettext
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
@@ -66,11 +65,11 @@ import IPy
|
||||
|
||||
# If ../nova/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'nova', '__init__.py')):
|
||||
sys.path.insert(0, POSSIBLE_TOPDIR)
|
||||
|
||||
gettext.install('nova', unicode=1)
|
||||
|
||||
@@ -809,12 +808,11 @@ class VolumeCommands(object):
|
||||
class InstanceTypeCommands(object):
|
||||
"""Class for managing instance types / flavors."""
|
||||
|
||||
def _print_instance_types(self, n, val):
|
||||
"""helper method to print out instance_types values"""
|
||||
def _print_instance_types(self, name, val):
|
||||
deleted = ('', ', inactive')[val["deleted"] == 1]
|
||||
print ("%s: Memory: %sMB, VCPUS: %s, Storage: %sGB, FlavorID: %s, "
|
||||
"Swap: %sGB, RXTX Quota: %sGB, RXTX Cap: %sMB%s") % (
|
||||
n, val["memory_mb"], val["vcpus"], val["local_gb"],
|
||||
name, val["memory_mb"], val["vcpus"], val["local_gb"],
|
||||
val["flavorid"], val["swap"], val["rxtx_quota"],
|
||||
val["rxtx_cap"], deleted)
|
||||
|
||||
@@ -1028,7 +1026,7 @@ class ImageCommands(object):
|
||||
machine_images[image_path] = image_metadata
|
||||
else:
|
||||
other_images[image_path] = image_metadata
|
||||
except Exception as exc:
|
||||
except Exception:
|
||||
print _("Failed to load %(fn)s.") % locals()
|
||||
# NOTE(vish): do kernels and ramdisks first so images
|
||||
self._convert_images(other_images)
|
||||
|
@@ -18,14 +18,14 @@
|
||||
|
||||
"""Super simple fake memcache client."""
|
||||
|
||||
import utils
|
||||
from nova import utils
|
||||
|
||||
|
||||
class Client(object):
|
||||
"""Replicates a tiny subset of memcached client interface."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Ignores the passed in args"""
|
||||
"""Ignores the passed in args."""
|
||||
self.cache = {}
|
||||
|
||||
def get(self, key):
|
||||
|
@@ -16,9 +16,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
"""Command-line flag library.
|
||||
|
||||
Wraps gflags.
|
||||
|
||||
Package-level global flags are defined here, the rest are defined
|
||||
where they're used.
|
||||
|
||||
"""
|
||||
|
||||
import getopt
|
||||
@@ -145,10 +149,12 @@ class FlagValues(gflags.FlagValues):
|
||||
|
||||
|
||||
class StrWrapper(object):
|
||||
"""Wrapper around FlagValues objects
|
||||
"""Wrapper around FlagValues objects.
|
||||
|
||||
Wraps FlagValues objects for string.Template so that we're
|
||||
sure to return strings."""
|
||||
sure to return strings.
|
||||
|
||||
"""
|
||||
def __init__(self, context_objs):
|
||||
self.context_objs = context_objs
|
||||
|
||||
@@ -169,6 +175,7 @@ def _GetCallingModule():
|
||||
|
||||
We generally use this function to get the name of the module calling a
|
||||
DEFINE_foo... function.
|
||||
|
||||
"""
|
||||
# Walk down the stack to find the first globals dict that's not ours.
|
||||
for depth in range(1, sys.getrecursionlimit()):
|
||||
@@ -192,6 +199,7 @@ def __GetModuleName(globals_dict):
|
||||
Returns:
|
||||
A string (the name of the module) or None (if the module could not
|
||||
be identified.
|
||||
|
||||
"""
|
||||
for name, module in sys.modules.iteritems():
|
||||
if getattr(module, '__dict__', None) is globals_dict:
|
||||
@@ -316,7 +324,7 @@ DEFINE_string('null_kernel', 'nokernel',
|
||||
'kernel image that indicates not to use a kernel,'
|
||||
' but to use a raw disk image instead')
|
||||
|
||||
DEFINE_string('vpn_image_id', 'ami-cloudpipe', 'AMI for cloudpipe vpn server')
|
||||
DEFINE_integer('vpn_image_id', 0, 'integer id for cloudpipe vpn server')
|
||||
DEFINE_string('vpn_key_suffix',
|
||||
'-vpn',
|
||||
'Suffix to add to project name for vpn key and secgroups')
|
||||
@@ -326,7 +334,7 @@ DEFINE_integer('auth_token_ttl', 3600, 'Seconds for auth tokens to linger')
|
||||
DEFINE_string('state_path', os.path.join(os.path.dirname(__file__), '../'),
|
||||
"Top-level directory for maintaining nova's state")
|
||||
DEFINE_string('lock_path', os.path.join(os.path.dirname(__file__), '../'),
|
||||
"Directory for lock files")
|
||||
'Directory for lock files')
|
||||
DEFINE_string('logdir', None, 'output to a per-service log file in named '
|
||||
'directory')
|
||||
|
||||
|
49
nova/log.py
49
nova/log.py
@@ -16,16 +16,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Nova logging handler.
|
||||
"""Nova logging handler.
|
||||
|
||||
This module adds to logging functionality by adding the option to specify
|
||||
a context object when calling the various log methods. If the context object
|
||||
is not specified, default formatting is used.
|
||||
|
||||
It also allows setting of formatting information through flags.
|
||||
"""
|
||||
|
||||
"""
|
||||
|
||||
import cStringIO
|
||||
import inspect
|
||||
@@ -41,34 +40,28 @@ from nova import version
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
|
||||
flags.DEFINE_string('logging_context_format_string',
|
||||
'%(asctime)s %(levelname)s %(name)s '
|
||||
'[%(request_id)s %(user)s '
|
||||
'%(project)s] %(message)s',
|
||||
'format string to use for log messages with context')
|
||||
|
||||
flags.DEFINE_string('logging_default_format_string',
|
||||
'%(asctime)s %(levelname)s %(name)s [-] '
|
||||
'%(message)s',
|
||||
'format string to use for log messages without context')
|
||||
|
||||
flags.DEFINE_string('logging_debug_format_suffix',
|
||||
'from (pid=%(process)d) %(funcName)s'
|
||||
' %(pathname)s:%(lineno)d',
|
||||
'data to append to log format when level is DEBUG')
|
||||
|
||||
flags.DEFINE_string('logging_exception_prefix',
|
||||
'(%(name)s): TRACE: ',
|
||||
'prefix each line of exception output with this format')
|
||||
|
||||
flags.DEFINE_list('default_log_levels',
|
||||
['amqplib=WARN',
|
||||
'sqlalchemy=WARN',
|
||||
'boto=WARN',
|
||||
'eventlet.wsgi.server=WARN'],
|
||||
'list of logger=LEVEL pairs')
|
||||
|
||||
flags.DEFINE_bool('use_syslog', False, 'output to syslog')
|
||||
flags.DEFINE_string('logfile', None, 'output to named file')
|
||||
|
||||
@@ -83,6 +76,8 @@ WARN = logging.WARN
|
||||
INFO = logging.INFO
|
||||
DEBUG = logging.DEBUG
|
||||
NOTSET = logging.NOTSET
|
||||
|
||||
|
||||
# methods
|
||||
getLogger = logging.getLogger
|
||||
debug = logging.debug
|
||||
@@ -93,6 +88,8 @@ error = logging.error
|
||||
exception = logging.exception
|
||||
critical = logging.critical
|
||||
log = logging.log
|
||||
|
||||
|
||||
# handlers
|
||||
StreamHandler = logging.StreamHandler
|
||||
WatchedFileHandler = logging.handlers.WatchedFileHandler
|
||||
@@ -127,17 +124,18 @@ def _get_log_file_path(binary=None):
|
||||
|
||||
|
||||
class NovaLogger(logging.Logger):
|
||||
"""
|
||||
NovaLogger manages request context and formatting.
|
||||
"""NovaLogger manages request context and formatting.
|
||||
|
||||
This becomes the class that is instanciated by logging.getLogger.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, name, level=NOTSET):
|
||||
logging.Logger.__init__(self, name, level)
|
||||
self.setup_from_flags()
|
||||
|
||||
def setup_from_flags(self):
|
||||
"""Setup logger from flags"""
|
||||
"""Setup logger from flags."""
|
||||
level = NOTSET
|
||||
for pair in FLAGS.default_log_levels:
|
||||
logger, _sep, level_name = pair.partition('=')
|
||||
@@ -148,7 +146,7 @@ class NovaLogger(logging.Logger):
|
||||
self.setLevel(level)
|
||||
|
||||
def _log(self, level, msg, args, exc_info=None, extra=None, context=None):
|
||||
"""Extract context from any log call"""
|
||||
"""Extract context from any log call."""
|
||||
if not extra:
|
||||
extra = {}
|
||||
if context:
|
||||
@@ -157,17 +155,17 @@ class NovaLogger(logging.Logger):
|
||||
return logging.Logger._log(self, level, msg, args, exc_info, extra)
|
||||
|
||||
def addHandler(self, handler):
|
||||
"""Each handler gets our custom formatter"""
|
||||
"""Each handler gets our custom formatter."""
|
||||
handler.setFormatter(_formatter)
|
||||
return logging.Logger.addHandler(self, handler)
|
||||
|
||||
def audit(self, msg, *args, **kwargs):
|
||||
"""Shortcut for our AUDIT level"""
|
||||
"""Shortcut for our AUDIT level."""
|
||||
if self.isEnabledFor(AUDIT):
|
||||
self._log(AUDIT, msg, args, **kwargs)
|
||||
|
||||
def exception(self, msg, *args, **kwargs):
|
||||
"""Logging.exception doesn't handle kwargs, so breaks context"""
|
||||
"""Logging.exception doesn't handle kwargs, so breaks context."""
|
||||
if not kwargs.get('exc_info'):
|
||||
kwargs['exc_info'] = 1
|
||||
self.error(msg, *args, **kwargs)
|
||||
@@ -181,14 +179,13 @@ class NovaLogger(logging.Logger):
|
||||
for k in env.keys():
|
||||
if not isinstance(env[k], str):
|
||||
env.pop(k)
|
||||
message = "Environment: %s" % json.dumps(env)
|
||||
message = 'Environment: %s' % json.dumps(env)
|
||||
kwargs.pop('exc_info')
|
||||
self.error(message, **kwargs)
|
||||
|
||||
|
||||
class NovaFormatter(logging.Formatter):
|
||||
"""
|
||||
A nova.context.RequestContext aware formatter configured through flags.
|
||||
"""A nova.context.RequestContext aware formatter configured through flags.
|
||||
|
||||
The flags used to set format strings are: logging_context_foramt_string
|
||||
and logging_default_format_string. You can also specify
|
||||
@@ -197,10 +194,11 @@ class NovaFormatter(logging.Formatter):
|
||||
|
||||
For information about what variables are available for the formatter see:
|
||||
http://docs.python.org/library/logging.html#formatter
|
||||
|
||||
"""
|
||||
|
||||
def format(self, record):
|
||||
"""Uses contextstring if request_id is set, otherwise default"""
|
||||
"""Uses contextstring if request_id is set, otherwise default."""
|
||||
if record.__dict__.get('request_id', None):
|
||||
self._fmt = FLAGS.logging_context_format_string
|
||||
else:
|
||||
@@ -214,20 +212,21 @@ class NovaFormatter(logging.Formatter):
|
||||
return logging.Formatter.format(self, record)
|
||||
|
||||
def formatException(self, exc_info, record=None):
|
||||
"""Format exception output with FLAGS.logging_exception_prefix"""
|
||||
"""Format exception output with FLAGS.logging_exception_prefix."""
|
||||
if not record:
|
||||
return logging.Formatter.formatException(self, exc_info)
|
||||
stringbuffer = cStringIO.StringIO()
|
||||
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
|
||||
None, stringbuffer)
|
||||
lines = stringbuffer.getvalue().split("\n")
|
||||
lines = stringbuffer.getvalue().split('\n')
|
||||
stringbuffer.close()
|
||||
formatted_lines = []
|
||||
for line in lines:
|
||||
pl = FLAGS.logging_exception_prefix % record.__dict__
|
||||
fl = "%s%s" % (pl, line)
|
||||
fl = '%s%s' % (pl, line)
|
||||
formatted_lines.append(fl)
|
||||
return "\n".join(formatted_lines)
|
||||
return '\n'.join(formatted_lines)
|
||||
|
||||
|
||||
_formatter = NovaFormatter()
|
||||
|
||||
@@ -241,7 +240,7 @@ class NovaRootLogger(NovaLogger):
|
||||
NovaLogger.__init__(self, name, level)
|
||||
|
||||
def setup_from_flags(self):
|
||||
"""Setup logger from flags"""
|
||||
"""Setup logger from flags."""
|
||||
global _filelog
|
||||
if FLAGS.use_syslog:
|
||||
self.syslog = SysLogHandler(address='/dev/log')
|
||||
|
152
nova/rpc.py
152
nova/rpc.py
@@ -16,9 +16,12 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
AMQP-based RPC. Queues have consumers and publishers.
|
||||
"""AMQP-based RPC.
|
||||
|
||||
Queues have consumers and publishers.
|
||||
|
||||
No fan-out support yet.
|
||||
|
||||
"""
|
||||
|
||||
import json
|
||||
@@ -40,17 +43,19 @@ from nova import log as logging
|
||||
from nova import utils
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
LOG = logging.getLogger('nova.rpc')
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
|
||||
|
||||
|
||||
class Connection(carrot_connection.BrokerConnection):
|
||||
"""Connection instance object"""
|
||||
"""Connection instance object."""
|
||||
|
||||
@classmethod
|
||||
def instance(cls, new=True):
|
||||
"""Returns the instance"""
|
||||
"""Returns the instance."""
|
||||
if new or not hasattr(cls, '_instance'):
|
||||
params = dict(hostname=FLAGS.rabbit_host,
|
||||
port=FLAGS.rabbit_port,
|
||||
@@ -71,9 +76,11 @@ class Connection(carrot_connection.BrokerConnection):
|
||||
|
||||
@classmethod
|
||||
def recreate(cls):
|
||||
"""Recreates the connection instance
|
||||
"""Recreates the connection instance.
|
||||
|
||||
This is necessary to recover from some network errors/disconnects"""
|
||||
This is necessary to recover from some network errors/disconnects.
|
||||
|
||||
"""
|
||||
try:
|
||||
del cls._instance
|
||||
except AttributeError, e:
|
||||
@@ -84,10 +91,12 @@ class Connection(carrot_connection.BrokerConnection):
|
||||
|
||||
|
||||
class Consumer(messaging.Consumer):
|
||||
"""Consumer base class
|
||||
"""Consumer base class.
|
||||
|
||||
Contains methods for connecting the fetch method to async loops.
|
||||
|
||||
Contains methods for connecting the fetch method to async loops
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
for i in xrange(FLAGS.rabbit_max_retries):
|
||||
if i > 0:
|
||||
@@ -100,19 +109,18 @@ class Consumer(messaging.Consumer):
|
||||
fl_host = FLAGS.rabbit_host
|
||||
fl_port = FLAGS.rabbit_port
|
||||
fl_intv = FLAGS.rabbit_retry_interval
|
||||
LOG.error(_("AMQP server on %(fl_host)s:%(fl_port)d is"
|
||||
" unreachable: %(e)s. Trying again in %(fl_intv)d"
|
||||
" seconds.")
|
||||
% locals())
|
||||
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
|
||||
' unreachable: %(e)s. Trying again in %(fl_intv)d'
|
||||
' seconds.') % locals())
|
||||
self.failed_connection = True
|
||||
if self.failed_connection:
|
||||
LOG.error(_("Unable to connect to AMQP server "
|
||||
"after %d tries. Shutting down."),
|
||||
LOG.error(_('Unable to connect to AMQP server '
|
||||
'after %d tries. Shutting down.'),
|
||||
FLAGS.rabbit_max_retries)
|
||||
sys.exit(1)
|
||||
|
||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||
"""Wraps the parent fetch with some logic for failed connections"""
|
||||
"""Wraps the parent fetch with some logic for failed connection."""
|
||||
# TODO(vish): the logic for failed connections and logging should be
|
||||
# refactored into some sort of connection manager object
|
||||
try:
|
||||
@@ -125,14 +133,14 @@ class Consumer(messaging.Consumer):
|
||||
self.declare()
|
||||
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
|
||||
if self.failed_connection:
|
||||
LOG.error(_("Reconnected to queue"))
|
||||
LOG.error(_('Reconnected to queue'))
|
||||
self.failed_connection = False
|
||||
# NOTE(vish): This is catching all errors because we really don't
|
||||
# want exceptions to be logged 10 times a second if some
|
||||
# persistent failure occurs.
|
||||
except Exception, e: # pylint: disable=W0703
|
||||
if not self.failed_connection:
|
||||
LOG.exception(_("Failed to fetch message from queue: %s" % e))
|
||||
LOG.exception(_('Failed to fetch message from queue: %s' % e))
|
||||
self.failed_connection = True
|
||||
|
||||
def attach_to_eventlet(self):
|
||||
@@ -143,8 +151,9 @@ class Consumer(messaging.Consumer):
|
||||
|
||||
|
||||
class AdapterConsumer(Consumer):
|
||||
"""Calls methods on a proxy object based on method and args"""
|
||||
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
||||
"""Calls methods on a proxy object based on method and args."""
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
|
||||
self.proxy = proxy
|
||||
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
|
||||
@@ -156,13 +165,14 @@ class AdapterConsumer(Consumer):
|
||||
|
||||
@exception.wrap_exception
|
||||
def _receive(self, message_data, message):
|
||||
"""Magically looks for a method on the proxy object and calls it
|
||||
"""Magically looks for a method on the proxy object and calls it.
|
||||
|
||||
Message data should be a dictionary with two keys:
|
||||
method: string representing the method to call
|
||||
args: dictionary of arg: value
|
||||
|
||||
Example: {'method': 'echo', 'args': {'value': 42}}
|
||||
|
||||
"""
|
||||
LOG.debug(_('received %s') % message_data)
|
||||
msg_id = message_data.pop('_msg_id', None)
|
||||
@@ -189,22 +199,23 @@ class AdapterConsumer(Consumer):
|
||||
if msg_id:
|
||||
msg_reply(msg_id, rval, None)
|
||||
except Exception as e:
|
||||
logging.exception("Exception during message handling")
|
||||
logging.exception('Exception during message handling')
|
||||
if msg_id:
|
||||
msg_reply(msg_id, None, sys.exc_info())
|
||||
return
|
||||
|
||||
|
||||
class Publisher(messaging.Publisher):
|
||||
"""Publisher base class"""
|
||||
"""Publisher base class."""
|
||||
pass
|
||||
|
||||
|
||||
class TopicAdapterConsumer(AdapterConsumer):
|
||||
"""Consumes messages on a specific topic"""
|
||||
exchange_type = "topic"
|
||||
"""Consumes messages on a specific topic."""
|
||||
|
||||
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
||||
exchange_type = 'topic'
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||
self.queue = topic
|
||||
self.routing_key = topic
|
||||
self.exchange = FLAGS.control_exchange
|
||||
@@ -214,27 +225,29 @@ class TopicAdapterConsumer(AdapterConsumer):
|
||||
|
||||
|
||||
class FanoutAdapterConsumer(AdapterConsumer):
|
||||
"""Consumes messages from a fanout exchange"""
|
||||
exchange_type = "fanout"
|
||||
"""Consumes messages from a fanout exchange."""
|
||||
|
||||
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
||||
self.exchange = "%s_fanout" % topic
|
||||
exchange_type = 'fanout'
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||
self.exchange = '%s_fanout' % topic
|
||||
self.routing_key = topic
|
||||
unique = uuid.uuid4().hex
|
||||
self.queue = "%s_fanout_%s" % (topic, unique)
|
||||
self.queue = '%s_fanout_%s' % (topic, unique)
|
||||
self.durable = False
|
||||
LOG.info(_("Created '%(exchange)s' fanout exchange "
|
||||
"with '%(key)s' routing key"),
|
||||
dict(exchange=self.exchange, key=self.routing_key))
|
||||
LOG.info(_('Created "%(exchange)s" fanout exchange '
|
||||
'with "%(key)s" routing key'),
|
||||
dict(exchange=self.exchange, key=self.routing_key))
|
||||
super(FanoutAdapterConsumer, self).__init__(connection=connection,
|
||||
topic=topic, proxy=proxy)
|
||||
|
||||
|
||||
class TopicPublisher(Publisher):
|
||||
"""Publishes messages on a specific topic"""
|
||||
exchange_type = "topic"
|
||||
"""Publishes messages on a specific topic."""
|
||||
|
||||
def __init__(self, connection=None, topic="broadcast"):
|
||||
exchange_type = 'topic'
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast'):
|
||||
self.routing_key = topic
|
||||
self.exchange = FLAGS.control_exchange
|
||||
self.durable = False
|
||||
@@ -243,20 +256,22 @@ class TopicPublisher(Publisher):
|
||||
|
||||
class FanoutPublisher(Publisher):
|
||||
"""Publishes messages to a fanout exchange."""
|
||||
exchange_type = "fanout"
|
||||
|
||||
exchange_type = 'fanout'
|
||||
|
||||
def __init__(self, topic, connection=None):
|
||||
self.exchange = "%s_fanout" % topic
|
||||
self.queue = "%s_fanout" % topic
|
||||
self.exchange = '%s_fanout' % topic
|
||||
self.queue = '%s_fanout' % topic
|
||||
self.durable = False
|
||||
LOG.info(_("Creating '%(exchange)s' fanout exchange"),
|
||||
dict(exchange=self.exchange))
|
||||
LOG.info(_('Creating "%(exchange)s" fanout exchange'),
|
||||
dict(exchange=self.exchange))
|
||||
super(FanoutPublisher, self).__init__(connection=connection)
|
||||
|
||||
|
||||
class DirectConsumer(Consumer):
|
||||
"""Consumes messages directly on a channel specified by msg_id"""
|
||||
exchange_type = "direct"
|
||||
"""Consumes messages directly on a channel specified by msg_id."""
|
||||
|
||||
exchange_type = 'direct'
|
||||
|
||||
def __init__(self, connection=None, msg_id=None):
|
||||
self.queue = msg_id
|
||||
@@ -268,8 +283,9 @@ class DirectConsumer(Consumer):
|
||||
|
||||
|
||||
class DirectPublisher(Publisher):
|
||||
"""Publishes messages directly on a channel specified by msg_id"""
|
||||
exchange_type = "direct"
|
||||
"""Publishes messages directly on a channel specified by msg_id."""
|
||||
|
||||
exchange_type = 'direct'
|
||||
|
||||
def __init__(self, connection=None, msg_id=None):
|
||||
self.routing_key = msg_id
|
||||
@@ -279,9 +295,9 @@ class DirectPublisher(Publisher):
|
||||
|
||||
|
||||
def msg_reply(msg_id, reply=None, failure=None):
|
||||
"""Sends a reply or an error on the channel signified by msg_id
|
||||
"""Sends a reply or an error on the channel signified by msg_id.
|
||||
|
||||
failure should be a sys.exc_info() tuple.
|
||||
Failure should be a sys.exc_info() tuple.
|
||||
|
||||
"""
|
||||
if failure:
|
||||
@@ -303,17 +319,20 @@ def msg_reply(msg_id, reply=None, failure=None):
|
||||
|
||||
|
||||
class RemoteError(exception.Error):
|
||||
"""Signifies that a remote class has raised an exception
|
||||
"""Signifies that a remote class has raised an exception.
|
||||
|
||||
Containes a string representation of the type of the original exception,
|
||||
the value of the original exception, and the traceback. These are
|
||||
sent to the parent as a joined string so printing the exception
|
||||
contains all of the relevent info."""
|
||||
contains all of the relevent info.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, exc_type, value, traceback):
|
||||
self.exc_type = exc_type
|
||||
self.value = value
|
||||
self.traceback = traceback
|
||||
super(RemoteError, self).__init__("%s %s\n%s" % (exc_type,
|
||||
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
|
||||
value,
|
||||
traceback))
|
||||
|
||||
@@ -339,6 +358,7 @@ def _pack_context(msg, context):
|
||||
context out into a bunch of separate keys. If we want to support
|
||||
more arguments in rabbit messages, we may want to do the same
|
||||
for args at some point.
|
||||
|
||||
"""
|
||||
context = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.to_dict().iteritems()])
|
||||
@@ -346,11 +366,11 @@ def _pack_context(msg, context):
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
"""Sends a message on a topic and wait for a response"""
|
||||
LOG.debug(_("Making asynchronous call on %s ..."), topic)
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
LOG.debug(_('Making asynchronous call on %s ...'), topic)
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg.update({'_msg_id': msg_id})
|
||||
LOG.debug(_("MSG_ID is %s") % (msg_id))
|
||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
||||
_pack_context(msg, context)
|
||||
|
||||
class WaitMessage(object):
|
||||
@@ -387,8 +407,8 @@ def call(context, topic, msg):
|
||||
|
||||
|
||||
def cast(context, topic, msg):
|
||||
"""Sends a message on a topic without waiting for a response"""
|
||||
LOG.debug(_("Making asynchronous cast on %s..."), topic)
|
||||
"""Sends a message on a topic without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
||||
_pack_context(msg, context)
|
||||
conn = Connection.instance()
|
||||
publisher = TopicPublisher(connection=conn, topic=topic)
|
||||
@@ -397,8 +417,8 @@ def cast(context, topic, msg):
|
||||
|
||||
|
||||
def fanout_cast(context, topic, msg):
|
||||
"""Sends a message on a fanout exchange without waiting for a response"""
|
||||
LOG.debug(_("Making asynchronous fanout cast..."))
|
||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous fanout cast...'))
|
||||
_pack_context(msg, context)
|
||||
conn = Connection.instance()
|
||||
publisher = FanoutPublisher(topic, connection=conn)
|
||||
@@ -407,14 +427,14 @@ def fanout_cast(context, topic, msg):
|
||||
|
||||
|
||||
def generic_response(message_data, message):
|
||||
"""Logs a result and exits"""
|
||||
"""Logs a result and exits."""
|
||||
LOG.debug(_('response %s'), message_data)
|
||||
message.ack()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def send_message(topic, message, wait=True):
|
||||
"""Sends a message for testing"""
|
||||
"""Sends a message for testing."""
|
||||
msg_id = uuid.uuid4().hex
|
||||
message.update({'_msg_id': msg_id})
|
||||
LOG.debug(_('topic is %s'), topic)
|
||||
@@ -425,14 +445,14 @@ def send_message(topic, message, wait=True):
|
||||
queue=msg_id,
|
||||
exchange=msg_id,
|
||||
auto_delete=True,
|
||||
exchange_type="direct",
|
||||
exchange_type='direct',
|
||||
routing_key=msg_id)
|
||||
consumer.register_callback(generic_response)
|
||||
|
||||
publisher = messaging.Publisher(connection=Connection.instance(),
|
||||
exchange=FLAGS.control_exchange,
|
||||
durable=False,
|
||||
exchange_type="topic",
|
||||
exchange_type='topic',
|
||||
routing_key=topic)
|
||||
publisher.send(message)
|
||||
publisher.close()
|
||||
@@ -441,8 +461,8 @@ def send_message(topic, message, wait=True):
|
||||
consumer.wait()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# NOTE(vish): you can send messages from the command line using
|
||||
# topic and a json sting representing a dictionary
|
||||
# for the method
|
||||
if __name__ == '__main__':
|
||||
# You can send messages from the command line using
|
||||
# topic and a json string representing a dictionary
|
||||
# for the method
|
||||
send_message(sys.argv[1], json.loads(sys.argv[2]))
|
||||
|
@@ -618,7 +618,8 @@ class IptablesFirewallTestCase(test.TestCase):
|
||||
instance_ref = db.instance_create(self.context,
|
||||
{'user_id': 'fake',
|
||||
'project_id': 'fake',
|
||||
'mac_address': '56:12:12:12:12:12'})
|
||||
'mac_address': '56:12:12:12:12:12',
|
||||
'instance_type_id': 1})
|
||||
ip = '10.11.12.13'
|
||||
|
||||
network_ref = db.project_get_network(self.context,
|
||||
@@ -841,7 +842,8 @@ class NWFilterTestCase(test.TestCase):
|
||||
instance_ref = db.instance_create(self.context,
|
||||
{'user_id': 'fake',
|
||||
'project_id': 'fake',
|
||||
'mac_address': '00:A0:C9:14:C8:29'})
|
||||
'mac_address': '00:A0:C9:14:C8:29',
|
||||
'instance_type_id': 1})
|
||||
inst_id = instance_ref['id']
|
||||
|
||||
ip = '10.11.12.13'
|
||||
|
Reference in New Issue
Block a user