Perform a sync with oslo-incubator.

This oslo-incubator sync pulls in a new log.py which will make quantum's
default log output format the same as nova, glance and cinder (once
cinder's corresponding oslo sync merges). This common log format
simplifies log indexing as part of CI and makes lives easier for
deployers.

This sync does add a requirement on six as jsonutils depends on it. It
updates install_venv_common.py to be python26 compatible. It also brings
in a bunch of recent python3 compatibility that was added to oslo.

Fixes bug 1183144

Change-Id: Id0f196d7b5680e5950e4a27d66042bf00ccd49e6
This commit is contained in:
Clark Boylan 2013-05-22 14:44:48 -07:00
parent b03240b9bf
commit b5bf227db4
22 changed files with 471 additions and 172 deletions

View File

@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import gc
import pprint
import sys
@ -37,7 +39,7 @@ CONF.register_opts(eventlet_backdoor_opts)
def _dont_use_this():
print "Don't use this, just disconnect instead"
print("Don't use this, just disconnect instead")
def _find_objects(t):
@ -46,16 +48,16 @@ def _find_objects(t):
def _print_greenthreads():
for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt
print(i, gt)
traceback.print_stack(gt.gr_frame)
print
print()
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
print threadId
print(threadId)
traceback.print_stack(stack)
print
print()
def initialize_if_enabled():

View File

@ -98,7 +98,7 @@ def wrap_exception(f):
def _wrap(*args, **kw):
try:
return f(*args, **kw)
except Exception, e:
except Exception as e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception(_('Uncaught exception'))

View File

@ -41,6 +41,8 @@ import json
import types
import xmlrpclib
import six
from quantum.openstack.common import timeutils
@ -93,7 +95,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# value of itertools.count doesn't get caught by nasty_type_tests
# and results in infinite loop when list(value) is called.
if type(value) == itertools.count:
return unicode(value)
return six.text_type(value)
# FIXME(vish): Workaround for LP bug 852095. Without this workaround,
# tests that raise an exception in a mocked method that
@ -137,12 +139,12 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
return recursive(value.__dict__, level=level + 1)
else:
if any(test(value) for test in _nasty_type_tests):
return unicode(value)
return six.text_type(value)
return value
except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
return unicode(value)
return six.text_type(value)
def dumps(value, default=to_primitive, **kwargs):

View File

@ -49,6 +49,10 @@ CONF = cfg.CONF
CONF.register_opts(util_opts)
def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
@ -82,7 +86,7 @@ class _InterProcessLock(object):
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
except IOError, e:
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
@ -247,3 +251,28 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
return retval
return inner
return wrap
def synchronized_with_prefix(lock_file_prefix):
"""Partial object generator for the synchronization decorator.
Redefine @synchronized in each project like so::
(in nova/utils.py)
from nova.openstack.common import lockutils
synchronized = lockutils.synchronized_with_prefix('nova-')
(in nova/foo.py)
from nova import utils
@utils.synchronized('mylock')
def bar(self, *args):
...
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)

View File

@ -37,19 +37,17 @@ import logging
import logging.config
import logging.handlers
import os
import stat
import sys
import traceback
from oslo.config import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
from quantum.openstack.common import local
from quantum.openstack.common import notifier
_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [
@ -74,11 +72,13 @@ logging_cli_opts = [
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-format',
default=_DEFAULT_LOG_FORMAT,
default=None,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'Default: %(default)s'),
'This option is deprecated. Please use '
'logging_context_format_string and '
'logging_default_format_string instead.'),
cfg.StrOpt('log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
@ -104,10 +104,7 @@ logging_cli_opts = [
generic_log_opts = [
cfg.BoolOpt('use_stderr',
default=True,
help='Log output to standard error'),
cfg.StrOpt('logfile_mode',
default='0644',
help='Default file mode used when creating log files'),
help='Log output to standard error')
]
log_opts = [
@ -211,7 +208,27 @@ def _get_log_file_path(binary=None):
return '%s.log' % (os.path.join(logdir, binary),)
class ContextAdapter(logging.LoggerAdapter):
class BaseLoggerAdapter(logging.LoggerAdapter):
def audit(self, msg, *args, **kwargs):
self.log(logging.AUDIT, msg, *args, **kwargs)
class LazyAdapter(BaseLoggerAdapter):
def __init__(self, name='unknown', version='unknown'):
self._logger = None
self.extra = {}
self.name = name
self.version = version
@property
def logger(self):
if not self._logger:
self._logger = getLogger(self.name, self.version)
return self._logger
class ContextAdapter(BaseLoggerAdapter):
warn = logging.LoggerAdapter.warning
def __init__(self, logger, project_name, version_string):
@ -219,8 +236,9 @@ class ContextAdapter(logging.LoggerAdapter):
self.project = project_name
self.version = version_string
def audit(self, msg, *args, **kwargs):
self.log(logging.AUDIT, msg, *args, **kwargs)
@property
def handlers(self):
return self.logger.handlers
def deprecated(self, msg, *args, **kwargs):
stdmsg = _("Deprecated: %s") % msg
@ -304,17 +322,6 @@ class JSONFormatter(logging.Formatter):
return jsonutils.dumps(message)
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
if ('quantum.openstack.common.notifier.log_notifier' in
CONF.notification_driver):
return
notifier.api.notify(None, 'error.publisher',
'error_notification',
notifier.api.ERROR,
dict(error=record.msg))
def _create_logging_excepthook(product_name):
def logging_excepthook(type, value, tb):
extra = {}
@ -340,7 +347,7 @@ class LogConfigError(Exception):
def _load_log_config(log_config):
try:
logging.config.fileConfig(log_config)
except ConfigParser.Error, exc:
except ConfigParser.Error as exc:
raise LogConfigError(log_config, str(exc))
@ -399,11 +406,6 @@ def _setup_logging_from_conf():
filelog = logging.handlers.WatchedFileHandler(logpath)
log_root.addHandler(filelog)
mode = int(CONF.logfile_mode, 8)
st = os.stat(logpath)
if st.st_mode != (stat.S_IFREG | mode):
os.chmod(logpath, mode)
if CONF.use_stderr:
streamlog = ColorHandler()
log_root.addHandler(streamlog)
@ -415,15 +417,22 @@ def _setup_logging_from_conf():
log_root.addHandler(streamlog)
if CONF.publish_errors:
log_root.addHandler(PublishErrorsHandler(logging.ERROR))
handler = importutils.import_object(
"quantum.openstack.common.log_handler.PublishErrorsHandler",
logging.ERROR)
log_root.addHandler(handler)
datefmt = CONF.log_date_format
for handler in log_root.handlers:
datefmt = CONF.log_date_format
# NOTE(alaski): CONF.log_format overrides everything currently. This
# should be deprecated in favor of context aware formatting.
if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt))
log_root.info('Deprecated: log_format is now deprecated and will '
'be removed in the next release')
else:
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
handler.setFormatter(ContextFormatter(datefmt=datefmt))
if CONF.debug:
log_root.setLevel(logging.DEBUG)
@ -449,6 +458,15 @@ def getLogger(name='unknown', version='unknown'):
return _loggers[name]
def getLazyLogger(name='unknown', version='unknown'):
"""
create a pass-through logger that does not create the real logger
until it is really needed and delegates all calls to the real logger
once it is created
"""
return LazyAdapter(name, version)
class WritableLogger(object):
"""A thin wrapper that responds to `write` and logs."""
@ -460,7 +478,7 @@ class WritableLogger(object):
self.logger.log(self.level, msg)
class LegacyFormatter(logging.Formatter):
class ContextFormatter(logging.Formatter):
"""A context.RequestContext aware formatter configured through flags.
The flags used to set format strings are: logging_context_format_string

View File

@ -84,7 +84,7 @@ class FixedIntervalLoopingCall(LoopingCallBase):
LOG.warn(_('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone, e:
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
@ -131,7 +131,7 @@ class DynamicLoopingCall(LoopingCallBase):
LOG.debug(_('Dynamic looping call sleeping for %.02f '
'seconds'), idle)
greenthread.sleep(idle)
except LoopingCallDone, e:
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:

View File

@ -15,6 +15,7 @@
import datetime
import time
from oslo.config import cfg
from quantum.openstack.common.gettextutils import _

View File

@ -60,6 +60,7 @@ import abc
import re
import urllib
import six
import urllib2
from quantum.openstack.common.gettextutils import _
@ -436,7 +437,7 @@ def _parse_list_rule(rule):
or_list.append(AndCheck(and_list))
# If we have only one check, omit the "or"
if len(or_list) == 0:
if not or_list:
return FalseCheck()
elif len(or_list) == 1:
return or_list[0]
@ -738,6 +739,7 @@ class RuleCheck(Check):
class RoleCheck(Check):
def __call__(self, target, creds):
"""Check that there is a matching role in the cred dict."""
return self.match.lower() in [x.lower() for x in creds['roles']]
@ -774,5 +776,5 @@ class GenericCheck(Check):
# TODO(termie): do dict inspection via dot syntax
match = self.match % target
if self.kind in creds:
return match == unicode(creds[self.kind])
return match == six.text_type(creds[self.kind])
return False

View File

@ -34,6 +34,11 @@ from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class InvalidArgumentError(Exception):
def __init__(self, message=None):
super(InvalidArgumentError, self).__init__(message)
class UnknownArgumentError(Exception):
def __init__(self, message=None):
super(UnknownArgumentError, self).__init__(message)
@ -118,7 +123,7 @@ def execute(*cmd, **kwargs):
elif isinstance(check_exit_code, int):
check_exit_code = [check_exit_code]
if len(kwargs):
if kwargs:
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
@ -179,3 +184,64 @@ def execute(*cmd, **kwargs):
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
def trycmd(*args, **kwargs):
"""
A wrapper around execute() to more easily handle warnings and errors.
Returns an (out, err) tuple of strings containing the output of
the command's stdout and stderr. If 'err' is not empty then the
command can be considered to have failed.
:discard_warnings True | False. Defaults to False. If set to True,
then for succeeding commands, stderr is cleared
"""
discard_warnings = kwargs.pop('discard_warnings', False)
try:
out, err = execute(*args, **kwargs)
failed = False
except ProcessExecutionError, exn:
out, err = '', str(exn)
failed = True
if not failed and discard_warnings and err:
# Handle commands that output to stderr but otherwise succeed
err = ''
return out, err
def ssh_execute(ssh, cmd, process_input=None,
addl_env=None, check_exit_code=True):
LOG.debug(_('Running cmd (SSH): %s'), cmd)
if addl_env:
raise InvalidArgumentError(_('Environment not supported over SSH'))
if process_input:
# This is (probably) fixable if we need it...
raise InvalidArgumentError(_('process_input not supported over SSH'))
stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
channel = stdout_stream.channel
# NOTE(justinsb): This seems suspicious...
# ...other SSH clients have buffering issues with this approach
stdout = stdout_stream.read()
stderr = stderr_stream.read()
stdin_stream.close()
exit_status = channel.recv_exit_status()
# exit_status == -1 if no exit code was returned
if exit_status != -1:
LOG.debug(_('Result was %s') % exit_status)
if check_exit_code and exit_status != 0:
raise ProcessExecutionError(exit_code=exit_status,
stdout=stdout,
stderr=stderr,
cmd=cmd)
return (stdout, stderr)

View File

@ -197,8 +197,9 @@ class ReplyProxy(ConnectionContext):
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
LOG.warn(_('no calling threads waiting for msg_id : %s'
', message : %s') % (msg_id, message_data))
LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
', message : %(data)s'), {'msg_id': msg_id,
'data': message_data})
else:
waiter.put(message_data)

View File

@ -22,6 +22,7 @@ import sys
import traceback
from oslo.config import cfg
import six
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
@ -157,6 +158,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException):
"not supported by this endpoint.")
class RpcVersionCapError(RPCException):
message = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object):
"""A connection, returned by rpc.create_connection().
@ -299,7 +304,8 @@ def serialize_remote_exception(failure_info, log_failure=True):
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
if log_failure:
LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(_("Returning exception %s to caller"),
six.text_type(failure))
LOG.error(tb)
kwargs = {}
@ -309,7 +315,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
data = {
'class': str(failure.__class__.__name__),
'module': str(failure.__class__.__module__),
'message': unicode(failure),
'message': six.text_type(failure),
'tb': tb,
'args': failure.args,
'kwargs': kwargs

View File

@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified.
"""
from quantum.openstack.common.rpc import common as rpc_common
from quantum.openstack.common.rpc import serializer as rpc_serializer
class RpcDispatcher(object):
@ -93,16 +94,38 @@ class RpcDispatcher(object):
contains a list of underlying managers that have an API_VERSION attribute.
"""
def __init__(self, callbacks):
def __init__(self, callbacks, serializer=None):
"""Initialize the rpc dispatcher.
:param callbacks: List of proxy objects that are an instance
of a class with rpc methods exposed. Each proxy
object should have an RPC_API_VERSION attribute.
:param serializer: The Serializer object that will be used to
deserialize arguments before the method call and
to serialize the result after it returns.
"""
self.callbacks = callbacks
if serializer is None:
serializer = rpc_serializer.NoOpSerializer()
self.serializer = serializer
super(RpcDispatcher, self).__init__()
def _deserialize_args(self, context, kwargs):
"""Helper method called to deserialize args before dispatch.
This calls our serializer on each argument, returning a new set of
args that have been deserialized.
:param context: The request context
:param kwargs: The arguments to be deserialized
:returns: A new set of deserialized args
"""
new_kwargs = dict()
for argname, arg in kwargs.iteritems():
new_kwargs[argname] = self.serializer.deserialize_entity(context,
arg)
return new_kwargs
def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
@ -145,7 +168,9 @@ class RpcDispatcher(object):
if not hasattr(proxyobj, method):
continue
if is_compatible:
return getattr(proxyobj, method)(ctxt, **kwargs)
kwargs = self._deserialize_args(ctxt, kwargs)
result = getattr(proxyobj, method)(ctxt, **kwargs)
return self.serializer.serialize_entity(ctxt, result)
if had_compatible:
raise AttributeError("No such RPC function '%s'" % method)

View File

@ -375,7 +375,7 @@ class Connection(object):
try:
return method(*args, **kwargs)
except (qpid_exceptions.Empty,
qpid_exceptions.ConnectionError), e:
qpid_exceptions.ConnectionError) as e:
if error_callback:
error_callback(e)
self.reconnect()

View File

@ -180,7 +180,7 @@ class ZmqSocket(object):
return
# We must unsubscribe, or we'll leak descriptors.
if len(self.subscriptions) > 0:
if self.subscriptions:
for f in self.subscriptions:
try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
@ -763,7 +763,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
if len(queues) == 0:
if not queues:
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
@ -846,6 +846,11 @@ def _get_ctxt():
def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
matchmaker = importutils.import_object(
CONF.rpc_zmq_matchmaker, *args, **kwargs)
mm = CONF.rpc_zmq_matchmaker
if mm.endswith('matchmaker.MatchMakerRing'):
mm.replace('matchmaker', 'matchmaker_ring')
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
' %(new)s instead') % dict(
orig=CONF.rpc_zmq_matchmaker, new=mm))
matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker

View File

@ -19,8 +19,6 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import contextlib
import itertools
import json
import eventlet
from oslo.config import cfg
@ -30,10 +28,6 @@ from quantum.openstack.common import log as logging
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq',
default=300,
help='Heartbeat frequency'),
@ -236,7 +230,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
{'key': key, 'host': host})
def start_heartbeat(self):
"""
@ -245,7 +240,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
if len(self.hosts) == 0:
if not self.hosts:
raise MatchMakerException(
_("Register before starting heartbeat."))
@ -304,67 +299,6 @@ class StubExchange(Exchange):
return [(key, None)]
class RingExchange(Exchange):
"""
Match Maker where hosts are loaded from a static file containing
a hashmap (JSON formatted).
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self, host='localhost'):
@ -388,17 +322,6 @@ class DirectExchange(Exchange):
return [(key, e)]
class MatchMakerRing(MatchMakerBase):
"""
Match Maker where hosts are loaded from a static hashmap.
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
class MatchMakerLocalhost(MatchMakerBase):
"""
Match Maker where all bare topics resolve to localhost.

View File

@ -0,0 +1,114 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011-2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import itertools
import json
from oslo.config import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import matchmaker as mm
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('ringfile',
deprecated_name='matchmaker_ringfile',
deprecated_group='DEFAULT',
default='/etc/oslo/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
LOG = logging.getLogger(__name__)
class RingExchange(mm.Exchange):
"""
Match Maker where hosts are loaded from a static file containing
a hashmap (JSON formatted).
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ring.ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class MatchMakerRing(mm.MatchMakerBase):
"""
Match Maker where hosts are loaded from a static hashmap.
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(mm.DirectBinding(), mm.DirectExchange())
self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
# Copyright 2012-2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -23,6 +23,8 @@ For more information about rpc API version numbers, see:
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import common as rpc_common
from quantum.openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object):
@ -34,16 +36,28 @@ class RpcProxy(object):
rpc API.
"""
def __init__(self, topic, default_version):
# The default namespace, which can be overriden in a subclass.
RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None,
serializer=None):
"""Initialize an RpcProxy.
:param topic: The topic to use for all messages.
:param default_version: The default API version to request in all
outgoing messages. This can be overridden on a per-message
basis.
:param version_cap: Optionally cap the maximum version used for sent
messages.
:param serializer: Optionaly (de-)serialize entities with a
provided helper.
"""
self.topic = topic
self.default_version = default_version
self.version_cap = version_cap
if serializer is None:
serializer = rpc_serializer.NoOpSerializer()
self.serializer = serializer
super(RpcProxy, self).__init__()
def _set_version(self, msg, vers):
@ -52,7 +66,11 @@ class RpcProxy(object):
:param msg: The message having a version added to it.
:param vers: The version number to add to the message.
"""
msg['version'] = vers if vers else self.default_version
v = vers if vers else self.default_version
if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)):
raise rpc_common.RpcVersionCapError(version=self.version_cap)
msg['version'] = v
def _get_topic(self, topic):
"""Return the topic to use for a message."""
@ -62,9 +80,25 @@ class RpcProxy(object):
def make_namespaced_msg(method, namespace, **kwargs):
return {'method': method, 'namespace': namespace, 'args': kwargs}
@staticmethod
def make_msg(method, **kwargs):
return RpcProxy.make_namespaced_msg(method, None, **kwargs)
def make_msg(self, method, **kwargs):
return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
**kwargs)
def _serialize_msg_args(self, context, kwargs):
"""Helper method called to serialize message arguments.
This calls our serializer on each argument, returning a new
set of args that have been serialized.
:param context: The request context
:param kwargs: The arguments to serialize
:returns: A new set of serialized arguments
"""
new_kwargs = dict()
for argname, arg in kwargs.iteritems():
new_kwargs[argname] = self.serializer.serialize_entity(context,
arg)
return new_kwargs
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
@ -81,9 +115,11 @@ class RpcProxy(object):
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
return rpc.call(context, real_topic, msg, timeout)
result = rpc.call(context, real_topic, msg, timeout)
return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@ -104,9 +140,11 @@ class RpcProxy(object):
from the remote method as they arrive.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
return rpc.multicall(context, real_topic, msg, timeout)
result = rpc.multicall(context, real_topic, msg, timeout)
return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@ -124,6 +162,7 @@ class RpcProxy(object):
remote method.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
@ -139,6 +178,7 @@ class RpcProxy(object):
from the remote method.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
@ -157,6 +197,7 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
@ -175,5 +216,6 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)

View File

@ -0,0 +1,52 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provides the definition of an RPC serialization handler"""
import abc
class Serializer(object):
"""Generic (de-)serialization definition base class"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def serialize_entity(self, context, entity):
"""Serialize something to primitive form.
:param context: Security context
:param entity: Entity to be serialized
:returns: Serialized form of entity
"""
pass
@abc.abstractmethod
def deserialize_entity(self, context, entity):
"""Deserialize something from primitive form.
:param context: Security context
:param entity: Primitive to be deserialized
:returns: Deserialized form of entity
"""
pass
class NoOpSerializer(Serializer):
"""A serializer that does nothing"""
def serialize_entity(self, context, entity):
return entity
def deserialize_entity(self, context, entity):
return entity

View File

@ -52,7 +52,7 @@ class Launcher(object):
"""
self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_service(service):
@ -72,6 +72,7 @@ class Launcher(object):
:returns: None
"""
service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service)
def stop(self):

View File

@ -61,6 +61,13 @@ class ThreadGroup(object):
self.threads = []
self.timers = []
def add_dynamic_timer(self, callback, initial_delay=None,
periodic_interval_max=None, *args, **kwargs):
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
timer.start(initial_delay=initial_delay,
periodic_interval_max=periodic_interval_max)
self.timers.append(timer)
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)

View File

@ -18,10 +18,15 @@
"""Provides methods needed by installation script for OpenStack development
virtual environments.
Since this script is used to bootstrap a virtualenv from the system's Python
environment, it should be kept strictly compatible with Python 2.6.
Synced in from openstack-common
"""
import argparse
from __future__ import print_function
import optparse
import os
import subprocess
import sys
@ -39,7 +44,7 @@ class InstallVenv(object):
self.project = project
def die(self, message, *args):
print >> sys.stderr, message % args
print(message % args, file=sys.stderr)
sys.exit(1)
def check_python_version(self):
@ -86,20 +91,20 @@ class InstallVenv(object):
virtual environment.
"""
if not os.path.isdir(self.venv):
print 'Creating venv...',
print('Creating venv...', end=' ')
if no_site_packages:
self.run_command(['virtualenv', '-q', '--no-site-packages',
self.venv])
else:
self.run_command(['virtualenv', '-q', self.venv])
print 'done.'
print 'Installing pip in venv...',
print('done.')
print('Installing pip in venv...', end=' ')
if not self.run_command(['tools/with_venv.sh', 'easy_install',
'pip>1.0']).strip():
self.die("Failed to install pip.")
print 'done.'
print('done.')
else:
print "venv already exists..."
print("venv already exists...")
pass
def pip_install(self, *args):
@ -108,7 +113,7 @@ class InstallVenv(object):
redirect_output=False)
def install_dependencies(self):
print 'Installing dependencies with pip (this can take a while)...'
print('Installing dependencies with pip (this can take a while)...')
# First things first, make sure our venv has the latest pip and
# distribute.
@ -131,12 +136,12 @@ class InstallVenv(object):
def parse_args(self, argv):
"""Parses command-line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--no-site-packages',
action='store_true',
help="Do not inherit packages from global Python "
"install")
return parser.parse_args(argv[1:])
parser = optparse.OptionParser()
parser.add_option('-n', '--no-site-packages',
action='store_true',
help="Do not inherit packages from global Python "
"install")
return parser.parse_args(argv[1:])[0]
class Distro(InstallVenv):
@ -150,12 +155,12 @@ class Distro(InstallVenv):
return
if self.check_cmd('easy_install'):
print 'Installing virtualenv via easy_install...',
print('Installing virtualenv via easy_install...', end=' ')
if self.run_command(['easy_install', 'virtualenv']):
print 'Succeeded'
print('Succeeded')
return
else:
print 'Failed'
print('Failed')
self.die('ERROR: virtualenv not found.\n\n%s development'
' requires virtualenv, please install it using your'
@ -180,10 +185,6 @@ class Fedora(Distro):
return self.run_command_with_code(['rpm', '-q', pkg],
check_exit_code=False)[1] == 0
def yum_install(self, pkg, **kwargs):
print "Attempting to install '%s' via yum" % pkg
self.run_command(['sudo', 'yum', 'install', '-y', pkg], **kwargs)
def apply_patch(self, originalfile, patchfile):
self.run_command(['patch', '-N', originalfile, patchfile],
check_exit_code=False)
@ -193,7 +194,7 @@ class Fedora(Distro):
return
if not self.check_pkg('python-virtualenv'):
self.yum_install('python-virtualenv', check_exit_code=False)
self.die("Please install 'python-virtualenv'.")
super(Fedora, self).install_virtualenv()
@ -206,12 +207,13 @@ class Fedora(Distro):
This can be removed when the fix is applied upstream.
Nova: https://bugs.launchpad.net/nova/+bug/884915
Upstream: https://bitbucket.org/which_linden/eventlet/issue/89
Upstream: https://bitbucket.org/eventlet/eventlet/issue/89
RHEL: https://bugzilla.redhat.com/958868
"""
# Install "patch" program if it's not there
if not self.check_pkg('patch'):
self.yum_install('patch')
self.die("Please install 'patch'.")
# Apply the eventlet patch
self.apply_patch(os.path.join(self.venv, 'lib', self.py_version,

View File

@ -19,6 +19,7 @@ WebOb>=1.2
python-keystoneclient>=0.2.0
alembic>=0.4.1
http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a2.tar.gz#egg=oslo.config
six
# Cisco plugin dependencies
python-novaclient