Browse Source

Synchronize code from oslo

Use commit eaab5fae2502198e9fa57d0d90a7204a2bd83b16:
Merge "sort options to make --help output prettier"
(Wed Feb 13 12:52:14 2013 +0000)

Add processutils to quantum since impl_zmq depends on them.

Drop notifier.list_notifier that is not present in oslo.

Change-Id: I91d9ec05481b8c24da9fbee1ad4706ff56a3b7aa
Fixes: bug #1116290
changes/26/21226/4
Alessio Ababilov 10 years ago
parent
commit
8789704221
  1. 4
      doc/source/conf.py
  2. 2
      openstack-common.conf
  3. 2
      quantum/common/config.py
  4. 2
      quantum/openstack/common/cfg.py
  5. 11
      quantum/openstack/common/exception.py
  6. 43
      quantum/openstack/common/jsonutils.py
  7. 11
      quantum/openstack/common/local.py
  8. 110
      quantum/openstack/common/lockutils.py
  9. 118
      quantum/openstack/common/notifier/list_notifier.py
  10. 135
      quantum/openstack/common/processutils.py
  11. 60
      quantum/openstack/common/rpc/__init__.py
  12. 2
      quantum/openstack/common/rpc/amqp.py
  13. 2
      quantum/openstack/common/rpc/common.py
  14. 23
      quantum/openstack/common/rpc/impl_kombu.py
  15. 18
      quantum/openstack/common/rpc/impl_qpid.py
  16. 83
      quantum/openstack/common/rpc/impl_zmq.py
  17. 17
      quantum/openstack/common/rpc/matchmaker.py
  18. 4
      quantum/openstack/common/service.py
  19. 8
      quantum/openstack/common/setup.py
  20. 8
      quantum/openstack/common/threadgroup.py
  21. 18
      quantum/openstack/common/timeutils.py
  22. 14
      quantum/openstack/common/version.py
  23. 1
      tools/install_venv_common.py

4
doc/source/conf.py

@ -77,9 +77,9 @@ copyright = u'2011-present, OpenStack, LLC.'
#
# Version info
from quantum.version import version_info as quantum_version
release = quantum_version.version_string_with_vcs()
release = quantum_version.release_string()
# The short X.Y version.
version = quantum_version.canonical_version_string()
version = quantum_version.version_string()
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.

2
openstack-common.conf

@ -1,5 +1,5 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,install_venv_common,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version
modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,install_venv_common,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,processutils,rpc,service,setup,threadgroup,timeutils,uuidutils,version
# The base module to hold the copy of openstack.common
base=quantum

2
quantum/common/config.py

@ -84,7 +84,7 @@ rpc.set_defaults(control_exchange='quantum')
def parse(args):
cfg.CONF(args=args, project='quantum',
version='%%prog %s' % quantum_version.version_string_with_vcs())
version='%%prog %s' % quantum_version.release_string())
# Validate that the base_mac is of the correct format
msg = attributes._validate_regex(cfg.CONF.base_mac,

2
quantum/openstack/common/cfg.py

@ -1643,7 +1643,7 @@ class ConfigOpts(collections.Mapping):
"""
self._args = args
for opt, group in self._all_cli_opts():
for opt, group in sorted(self._all_cli_opts()):
opt._add_to_cli(self._oparser, group)
return vars(self._oparser.parse_args(args))

11
quantum/openstack/common/exception.py

@ -23,6 +23,8 @@ import logging
from quantum.openstack.common.gettextutils import _
_FATAL_EXCEPTION_FORMAT_ERRORS = False
class Error(Exception):
def __init__(self, message=None):
@ -121,9 +123,12 @@ class OpenstackException(Exception):
try:
self._error_string = self.message % kwargs
except Exception:
# at least get the core message out if something happened
self._error_string = self.message
except Exception as e:
if _FATAL_EXCEPTION_FORMAT_ERRORS:
raise e
else:
# at least get the core message out if something happened
self._error_string = self.message
def __str__(self):
return self._error_string

43
quantum/openstack/common/jsonutils.py

@ -34,15 +34,21 @@ This module provides a few things:
import datetime
import functools
import inspect
import itertools
import json
import logging
import xmlrpclib
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import timeutils
LOG = logging.getLogger(__name__)
def to_primitive(value, convert_instances=False, level=0):
def to_primitive(value, convert_instances=False, convert_datetime=True,
level=0, max_depth=3):
"""Convert a complex object into primitives.
Handy for JSON serialization. We can optionally handle instances,
@ -78,12 +84,19 @@ def to_primitive(value, convert_instances=False, level=0):
if getattr(value, '__module__', None) == 'mox':
return 'mock'
if level > 3:
if level > max_depth:
LOG.error(_('Max serialization depth exceeded on object: %d %s'),
level, value)
return '?'
# The try block may not be necessary after the class check above,
# but just in case ...
try:
recursive = functools.partial(to_primitive,
convert_instances=convert_instances,
convert_datetime=convert_datetime,
level=level,
max_depth=max_depth)
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
@ -91,33 +104,19 @@ def to_primitive(value, convert_instances=False, level=0):
value = datetime.datetime(*tuple(value.timetuple())[:6])
if isinstance(value, (list, tuple)):
o = []
for v in value:
o.append(to_primitive(v, convert_instances=convert_instances,
level=level))
return o
return [recursive(v) for v in value]
elif isinstance(value, dict):
o = {}
for k, v in value.iteritems():
o[k] = to_primitive(v, convert_instances=convert_instances,
level=level)
return o
elif isinstance(value, datetime.datetime):
return dict((k, recursive(v)) for k, v in value.iteritems())
elif convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif hasattr(value, 'iteritems'):
return to_primitive(dict(value.iteritems()),
convert_instances=convert_instances,
level=level + 1)
return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
return to_primitive(list(value),
convert_instances=convert_instances,
level=level)
return recursive(list(value))
elif convert_instances and hasattr(value, '__dict__'):
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
return to_primitive(value.__dict__,
convert_instances=convert_instances,
level=level + 1)
return recursive(value.__dict__, level=level + 1)
else:
return value
except TypeError:

11
quantum/openstack/common/local.py

@ -26,6 +26,9 @@ class WeakLocal(corolocal.local):
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
# the weak reference and return the inner value here.
rval = rval()
return rval
@ -34,4 +37,12 @@ class WeakLocal(corolocal.local):
return corolocal.local.__setattr__(self, attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
store = WeakLocal()
# A "weak" store uses weak references and allows an object to fall out of scope
# when it falls out of scope in the code that uses the thread local storage. A
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
strong_store = corolocal.local

110
quantum/openstack/common/lockutils.py

@ -29,6 +29,7 @@ from eventlet import semaphore
from quantum.openstack.common import cfg
from quantum.openstack.common import fileutils
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import local
from quantum.openstack.common import log as logging
@ -39,9 +40,8 @@ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
default=os.path.abspath(os.path.join(os.path.dirname(__file__),
'../')),
help='Directory to use for lock files')
help=('Directory to use for lock files. Default to a '
'temp directory'))
]
@ -140,7 +140,7 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
def foo(self, *args):
...
ensures that only one thread will execute the bar method at a time.
ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock::
@ -184,54 +184,66 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
'method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
cleanup_dir = True
fileutils.ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
'for method "%(method)s"...'),
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" '
'for method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
cleanup_dir = True
fileutils.ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at '
'%(path)s for method '
'"%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at '
'%(path)s for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'
' for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to cleanup
# the locks left behind by unit tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to
# cleanup the locks left behind by unit
# tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
finally:
local.strong_store.locks_held.remove(name)
return retval
return inner

118
quantum/openstack/common/notifier/list_notifier.py

@ -1,118 +0,0 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
list_notifier_drivers_opt = cfg.MultiStrOpt(
'list_notifier_drivers',
default=['quantum.openstack.common.notifier.no_op_notifier'],
help='List of drivers to send notifications')
CONF = cfg.CONF
CONF.register_opt(list_notifier_drivers_opt)
LOG = logging.getLogger(__name__)
drivers = None
class ImportFailureNotifier(object):
"""Noisily re-raises some exception over-and-over when notify is called."""
def __init__(self, exception):
self.exception = exception
def notify(self, context, message):
raise self.exception
def _get_drivers():
"""Instantiates and returns drivers based on the flag values."""
global drivers
if drivers is None:
drivers = []
for notification_driver in CONF.list_notifier_drivers:
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
return drivers
def add_driver(notification_driver):
"""Add a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
if isinstance(notification_driver, basestring):
# Load and add
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
else:
# Driver is already loaded; just add the object.
drivers.append(notification_driver)
def _object_name(obj):
name = []
if hasattr(obj, '__module__'):
name.append(obj.__module__)
if hasattr(obj, '__name__'):
name.append(obj.__name__)
else:
name.append(obj.__class__.__name__)
return '.'.join(name)
def remove_driver(notification_driver):
"""Remove a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
removed = False
if notification_driver in drivers:
# We're removing an object. Easy.
drivers.remove(notification_driver)
removed = True
else:
# We're removing a driver by name. Search for it.
for driver in drivers:
if _object_name(driver) == notification_driver:
drivers.remove(driver)
removed = True
if not removed:
raise ValueError("Cannot remove; %s is not in list" %
notification_driver)
def notify(context, message):
"""Passes notification to multiple notifiers in a list."""
for driver in _get_drivers():
try:
driver.notify(context, message)
except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to send to "
"notification driver %(driver)s."), locals())
def _reset_drivers():
"""Used by unit tests to reset the drivers."""
global drivers
drivers = None

135
quantum/openstack/common/processutils.py

@ -0,0 +1,135 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System-level utilities and helper functions.
"""
import logging
import random
import shlex
from eventlet.green import subprocess
from eventlet import greenthread
from quantum.openstack.common.gettextutils import _
LOG = logging.getLogger(__name__)
class UnknownArgumentError(Exception):
def __init__(self, message=None):
super(UnknownArgumentError, self).__init__(message)
class ProcessExecutionError(Exception):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
exit_code = '-'
message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r"
% (description, cmd, exit_code, stdout, stderr))
super(ProcessExecutionError, self).__init__(message)
def execute(*cmd, **kwargs):
"""
Helper method to shell out and execute a command through subprocess with
optional retry.
:param cmd: Passed to subprocess.Popen.
:type cmd: string
:param process_input: Send to opened process.
:type proces_input: string
:param check_exit_code: Defaults to 0. Will raise
:class:`ProcessExecutionError`
if the command exits without returning this value
as a returncode
:type check_exit_code: int
:param delay_on_retry: True | False. Defaults to True. If set to True,
wait a short amount of time before retrying.
:type delay_on_retry: boolean
:param attempts: How many times to retry cmd.
:type attempts: int
:param run_as_root: True | False. Defaults to False. If set to True,
the command is prefixed by the command specified
in the root_helper kwarg.
:type run_as_root: boolean
:param root_helper: command to prefix all cmd's with
:type root_helper: string
:returns: (stdout, stderr) from process execution
:raises: :class:`UnknownArgumentError` on
receiving unknown arguments
:raises: :class:`ProcessExecutionError`
"""
process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', 0)
delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
if len(kwargs):
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root:
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
while attempts > 0:
attempts -= 1
try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
obj = subprocess.Popen(cmd,
stdin=_PIPE,
stdout=_PIPE,
stderr=_PIPE,
close_fds=True)
result = None
if process_input is not None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.debug(_('Result was %s') % _returncode)
if (isinstance(check_exit_code, int) and
not isinstance(check_exit_code, bool) and
_returncode != check_exit_code):
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode,
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
return result
except ProcessExecutionError:
if not attempts:
raise
else:
LOG.debug(_('%r failed. Retrying.'), cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
finally:
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)

60
quantum/openstack/common/rpc/__init__.py

@ -25,8 +25,16 @@ For some wrappers that add message versioning to rpc, see:
rpc.proxy
"""
import inspect
import logging
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import local
LOG = logging.getLogger(__name__)
rpc_opts = [
@ -62,7 +70,8 @@ rpc_opts = [
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
CONF = cfg.CONF
CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
@ -83,10 +92,27 @@ def create_connection(new=True):
:returns: An instance of openstack.common.rpc.common.Connection
"""
return _get_impl().create_connection(cfg.CONF, new=new)
return _get_impl().create_connection(CONF, new=new)
def _check_for_lock():
if not CONF.debug:
return None
if ((hasattr(local.strong_store, 'locks_held')
and local.strong_store.locks_held)):
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
LOG.warn(_('A RPC is being made while holding a lock. The locks '
'currently held are %(locks)s. This is probably a bug. '
'Please report it. Include the following: [%(stack)s].'),
{'locks': local.strong_store.locks_held,
'stack': stack})
return True
return False
def call(context, topic, msg, timeout=None):
def call(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
@ -100,13 +126,17 @@ def call(context, topic, msg, timeout=None):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
if check_for_lock:
_check_for_lock()
return _get_impl().call(CONF, context, topic, msg, timeout)
def cast(context, topic, msg):
@ -124,7 +154,7 @@ def cast(context, topic, msg):
:returns: None
"""
return _get_impl().cast(cfg.CONF, context, topic, msg)
return _get_impl().cast(CONF, context, topic, msg)
def fanout_cast(context, topic, msg):
@ -145,10 +175,10 @@ def fanout_cast(context, topic, msg):
:returns: None
"""
return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
return _get_impl().fanout_cast(CONF, context, topic, msg)
def multicall(context, topic, msg, timeout=None):
def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
@ -166,6 +196,8 @@ def multicall(context, topic, msg, timeout=None):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
@ -175,7 +207,9 @@ def multicall(context, topic, msg, timeout=None):
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
if check_for_lock:
_check_for_lock()
return _get_impl().multicall(CONF, context, topic, msg, timeout)
def notify(context, topic, msg, envelope=False):
@ -217,7 +251,7 @@ def cast_to_server(context, server_params, topic, msg):
:returns: None
"""
return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
return _get_impl().cast_to_server(CONF, context, server_params, topic,
msg)
@ -233,7 +267,7 @@ def fanout_cast_to_server(context, server_params, topic, msg):
:returns: None
"""
return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
return _get_impl().fanout_cast_to_server(CONF, context, server_params,
topic, msg)
@ -263,10 +297,10 @@ def _get_impl():
global _RPCIMPL
if _RPCIMPL is None:
try:
_RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
_RPCIMPL = importutils.import_module(CONF.rpc_backend)
except ImportError:
# For backwards compatibility with older nova config.
impl = cfg.CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc')
impl = CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl)
return _RPCIMPL

2
quantum/openstack/common/rpc/amqp.py

@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg))
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg

2
quantum/openstack/common/rpc/common.py

@ -289,7 +289,7 @@ def deserialize_remote_exception(conf, data):
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in
# order to prevent arbitrary code execution.
if not module in conf.allowed_rpc_exception_modules:
if module not in conf.allowed_rpc_exception_modules:
return RemoteError(name, failure.get('message'), trace)
try:

23
quantum/openstack/common/rpc/impl_kombu.py

@ -66,7 +66,8 @@ kombu_opts = [
help='the RabbitMQ userid'),
cfg.StrOpt('rabbit_password',
default='guest',
help='the RabbitMQ password'),
help='the RabbitMQ password',
secret=True),
cfg.StrOpt('rabbit_virtual_host',
default='/',
help='the RabbitMQ virtual host'),
@ -302,9 +303,15 @@ class Publisher(object):
channel=channel,
routing_key=self.routing_key)
def send(self, msg):
def send(self, msg, timeout=None):
"""Send a message"""
self.producer.publish(msg)
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.
#
self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
else:
self.producer.publish(msg)
class DirectPublisher(Publisher):
@ -653,7 +660,7 @@ class Connection(object):
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs):
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
"""Send to a publisher based on the publisher class"""
def _error_callback(exc):
@ -663,7 +670,7 @@ class Connection(object):
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
publisher.send(msg)
publisher.send(msg, timeout)
self.ensure(_error_callback, _publish)
@ -691,9 +698,9 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg):
def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg)
self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
@ -701,7 +708,7 @@ class Connection(object):
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
"""Consume from all queues/consumers"""

18
quantum/openstack/common/rpc/impl_qpid.py

@ -51,7 +51,8 @@ qpid_opts = [
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
help='Password for qpid connection'),
help='Password for qpid connection',
secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
@ -486,9 +487,20 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg):
def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg)
#
# We want to create a message with attributes, e.g. a TTL. We
# don't really need to keep 'msg' in its JSON format any longer
# so let's create an actual qpid message here and get some
# value-add on the go.
#
# WARNING: Request timeout happens to be in the same units as
# qpid's TTL (seconds). If this changes in the future, then this
# will need to be altered accordingly.
#
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""

83
quantum/openstack/common/rpc/impl_zmq.py

@ -17,7 +17,6 @@
import os
import pprint
import socket
import string
import sys
import types
import uuid
@ -90,7 +89,7 @@ def _serialize(data):
Error if a developer passes us bad data.
"""
try:
return str(jsonutils.dumps(data, ensure_ascii=True))
return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
LOG.error(_("JSON serialization failed."))
raise
@ -218,10 +217,11 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
msg_id = msg_id or 0
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
def close(self):
self.outq.close()
@ -295,13 +295,13 @@ class InternalContext(object):
ctx.replies)
LOG.debug(_("Sending reply"))
cast(CONF, ctx, topic, {
_multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
'msg_id': msg_id,
'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
})
}, _msg_id=msg_id)
class ConsumerBase(object):
@ -321,21 +321,22 @@ class ConsumerBase(object):
return [result]
def process(self, style, target, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
# Method starting with - are
# processed internally. (non-valid method name)
method = data['method']
method = data.get('method')
if not method:
LOG.error(_("RPC message did not include method."))
return
# Internal method
# uses internal context for safety.
if data['method'][0] == '-':
# For reply / process_reply
method = method[1:]
if method == 'reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
if method == '-reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
return
data.setdefault('version', None)
data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
@ -436,20 +437,12 @@ class ZmqProxy(ZmqBaseReactor):
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
if topic not in self.topic_proxy:
def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic)
@ -600,8 +593,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
force_envelope=False):
def _cast(addr, context, topic, msg, timeout=None, serialize=True,
force_envelope=False, _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@ -610,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(msg_id, topic, payload, serialize, force_envelope)
conn.cast(_msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@ -618,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None,
def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@ -654,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
)
LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload,
_cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
@ -662,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1])
responses = _deserialize(msg[-1])[-1]['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
except (IndexError, KeyError):
raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
@ -682,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False):
force_envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@ -708,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout, serialize,
force_envelope)
_topic, msg, timeout, serialize,
force_envelope, _msg_id)
return
return method(_addr, context, _topic, _topic, msg, timeout,
return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope)
@ -777,21 +772,9 @@ def _get_ctxt():
return ZMQ_CTX
def _get_matchmaker():
def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class'
mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
# Only initialize a class.
if mm_path[-1][0] not in string.ascii_uppercase:
LOG.error(_("Matchmaker could not be loaded.\n"
"rpc_zmq_matchmaker is not a class."))
raise RPCException(_("Error loading Matchmaker."))
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
matchmaker = importutils.import_object(
CONF.rpc_zmq_matchmaker, *args, **kwargs)
return matchmaker

17
quantum/openstack/common/rpc/matchmaker.py

@ -201,24 +201,25 @@ class FanoutRingExchange(RingExchange):
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self):
def __init__(self, host='localhost'):
self.host = host
super(Exchange, self).__init__()
def run(self, key):
return [(key.split('.')[0] + '.localhost', 'localhost')]
return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange):
"""
Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
b, e = key.split('.', 1)
return [(b, e)]
e = key.split('.', 1)[1]
return [(key, e)]
class MatchMakerRing(MatchMakerBase):
@ -237,11 +238,11 @@ class MatchMakerLocalhost(MatchMakerBase):
Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
def __init__(self):
def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase):

4
quantum/openstack/common/service.py

@ -51,7 +51,7 @@ class Launcher(object):
:returns: None
"""
self._services = threadgroup.ThreadGroup('launcher')
self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled()
@staticmethod
@ -310,7 +310,7 @@ class Service(object):
"""Service object for binaries running on hosts."""
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup('service', threads)
self.tg = threadgroup.ThreadGroup(threads)
def start(self):
pass

8
quantum/openstack/common/setup.py

@ -274,7 +274,7 @@ def _get_revno():
return len(revlist.splitlines())
def get_version_from_git(pre_version):
def _get_version_from_git(pre_version):
"""Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions
if the current revision has no tag."""
@ -294,7 +294,7 @@ def get_version_from_git(pre_version):
return None
def get_version_from_pkg_info(package_name):
def _get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can."""
try:
pkg_info_file = open('PKG-INFO', 'r')
@ -325,10 +325,10 @@ def get_version(package_name, pre_version=None):
version = os.environ.get("OSLO_PACKAGE_VERSION", None)
if version:
return version
version = get_version_from_pkg_info(package_name)
version = _get_version_from_pkg_info(package_name)
if version:
return version
version = get_version_from_git(pre_version)
version = _get_version_from_git(pre_version)
if version:
return version
raise Exception("Versioning for this project requires either an sdist"

8
quantum/openstack/common/threadgroup.py

@ -38,8 +38,7 @@ class Thread(object):
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
def __init__(self, name, thread, group):
self.name = name
def __init__(self, thread, group):
self.thread = thread
self.thread.link(_thread_done, group=group, thread=self)
@ -57,8 +56,7 @@ class ThreadGroup(object):
when need be).
* provide an easy API to add timers.
"""
def __init__(self, name, thread_pool_size=10):
self.name = name
def __init__(self, thread_pool_size=10):
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
@ -72,7 +70,7 @@ class ThreadGroup(object):
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(callback.__name__, gt, self)
th = Thread(gt, self)
self.threads.append(th)
def thread_done(self, thread):

18
quantum/openstack/common/timeutils.py

@ -98,6 +98,11 @@ def utcnow():
return datetime.datetime.utcnow()
def iso8601_from_timestamp(timestamp):
"""Returns a iso8601 formated date from timestamp"""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
utcnow.override_time = None
@ -162,3 +167,16 @@ def delta_seconds(before, after):
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""
Determines if time is going to happen in the next window seconds.
:params dt: the time
:params window: minimum seconds to remain to consider the time not soon