Sync oslo with os/oslo-incubator

pyXX compatibility improvements
rpc-related bug fixes

Change-Id: I42576e58efced3b241476c23b4ffb5e5cf202c3e
This commit is contained in:
Sergey Lukjanov 2013-09-09 23:08:14 +04:00
parent d3edcdb996
commit c944f0f322
9 changed files with 251 additions and 113 deletions

View File

@ -246,7 +246,6 @@ import os.path
import re
import time
from eventlet import greenthread
from oslo.config import cfg
import six
from sqlalchemy import exc as sqla_exc
@ -590,14 +589,16 @@ def _add_regexp_listener(dbapi_con, con_record):
dbapi_con.create_function('regexp', 2, regexp)
def _greenthread_yield(dbapi_con, con_record):
def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch.
"""
greenthread.sleep(0)
time.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
@ -666,7 +667,7 @@ def create_engine(sql_connection, sqlite_fk=False):
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener)

View File

@ -69,33 +69,34 @@ def read_cached_file(filename, force_reload=False):
return (reloaded, cache_info['data'])
def delete_if_exists(path):
def delete_if_exists(path, remove=os.unlink):
"""Delete a file, but ignore file not found error.
:param path: File to delete
:param remove: Optional function to remove passed path
"""
try:
os.unlink(path)
remove(path)
except OSError as e:
if e.errno == errno.ENOENT:
return
else:
if e.errno != errno.ENOENT:
raise
@contextlib.contextmanager
def remove_path_on_error(path):
def remove_path_on_error(path, remove=delete_if_exists):
"""Protect code that wants to operate on PATH atomically.
Any exception will cause PATH to be removed.
:param path: File to work with
:param remove: Optional function to remove passed path
"""
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
delete_if_exists(path)
remove(path)
def file_open(*args, **kwargs):

View File

@ -38,8 +38,11 @@ import functools
import inspect
import itertools
import json
import types
import xmlrpclib
try:
import xmlrpclib
except ImportError:
# NOTE(jd): xmlrpclib is not shipped with Python 3
xmlrpclib = None
import six
@ -54,7 +57,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
inspect.isabstract]
_simple_types = (types.NoneType, int, basestring, bool, float, long)
_simple_types = (six.string_types + six.integer_types
+ (type(None), bool, float))
def to_primitive(value, convert_instances=False, convert_datetime=True,
@ -126,7 +130,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
if isinstance(value, xmlrpclib.DateTime):
if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
if convert_datetime and isinstance(value, datetime.datetime):

View File

@ -20,10 +20,10 @@ import contextlib
import errno
import functools
import os
import threading
import time
import weakref
from eventlet import semaphore
from oslo.config import cfg
from savanna.openstack.common import fileutils
@ -137,7 +137,8 @@ _semaphores = weakref.WeakValueDictionary()
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `semaphore.Semaphore` instance unless external is
This function yields a `threading.Semaphore` instance (if we don't use
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
@ -155,7 +156,7 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
sem = _semaphores.get(name, threading.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch

View File

@ -364,22 +364,43 @@ class CallbackWrapper(_ThreadPoolWithWait):
Allows it to be invoked in a green thread.
"""
def __init__(self, conf, callback, connection_pool):
def __init__(self, conf, callback, connection_pool,
wait_for_consumers=False):
"""Initiates CallbackWrapper object.
:param conf: cfg.CONF instance
:param callback: a callable (probably a function)
:param connection_pool: connection pool as returned by
get_connection_pool()
:param wait_for_consumers: wait for all green threads to
complete and raise the last
caught exception, if any.
"""
super(CallbackWrapper, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.callback = callback
self.wait_for_consumers = wait_for_consumers
self.exc_info = None
def _wrap(self, message_data, **kwargs):
"""Wrap the callback invocation to catch exceptions.
"""
try:
self.callback(message_data, **kwargs)
except Exception:
self.exc_info = sys.exc_info()
def __call__(self, message_data):
self.pool.spawn_n(self.callback, message_data)
self.exc_info = None
self.pool.spawn_n(self._wrap, message_data)
if self.wait_for_consumers:
self.pool.waitall()
if self.exc_info:
raise self.exc_info[1], None, self.exc_info[2]
class ProxyCallback(_ThreadPoolWithWait):

View File

@ -783,6 +783,7 @@ class Connection(object):
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
wait_for_consumers=not ack_on_error
)
self.proxy_callbacks.append(callback_wrapper)
self.declare_topic_consumer(

View File

@ -67,6 +67,17 @@ qpid_opts = [
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
# this file could probably use some additional refactoring so that the
# differences between each version are split into different classes.
cfg.IntOpt('qpid_topology_version',
default=1,
help="The qpid topology version to use. Version 1 is what "
"was originally used by impl_qpid. Version 2 includes "
"some backwards-incompatible changes that allow broker "
"federation to work. Users should update to version 2 "
"when they are able to take everything down, as it "
"requires a clean break."),
]
cfg.CONF.register_opts(qpid_opts)
@ -74,10 +85,17 @@ cfg.CONF.register_opts(qpid_opts)
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
def raise_invalid_topology_version(conf):
msg = (_("Invalid value for qpid_topology_version: %d") %
conf.qpid_topology_version)
LOG.error(msg)
raise Exception(msg)
class ConsumerBase(object):
"""Consumer base class."""
def __init__(self, session, callback, node_name, node_opts,
def __init__(self, conf, session, callback, node_name, node_opts,
link_name, link_opts):
"""Declare a queue on an amqp session.
@ -95,26 +113,38 @@ class ConsumerBase(object):
self.receiver = None
self.session = None
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
if conf.qpid_topology_version == 1:
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": True,
"auto-delete": True,
},
},
"link": {
"name": link_name,
"durable": True,
"auto-delete": True,
"x-declare": {
"durable": False,
"auto-delete": True,
"exclusive": False,
},
},
},
"link": {
"name": link_name,
"durable": True,
"x-declare": {
"durable": False,
"auto-delete": True,
"exclusive": False,
}
addr_opts["node"]["x-declare"].update(node_opts)
elif conf.qpid_topology_version == 2:
addr_opts = {
"link": {
"x-declare": {
"auto-delete": True,
},
},
},
}
addr_opts["node"]["x-declare"].update(node_opts)
}
else:
raise_invalid_topology_version()
addr_opts["link"]["x-declare"].update(link_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
@ -181,16 +211,24 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
super(DirectConsumer, self).__init__(
session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
})
link_opts = {
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (msg_id, msg_id)
node_opts = {"type": "direct"}
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
else:
raise_invalid_topology_version()
super(DirectConsumer, self).__init__(conf, session, callback,
node_name, node_opts, msg_id,
link_opts)
class TopicConsumer(ConsumerBase):
@ -208,14 +246,20 @@ class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(
session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic,
{
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
})
link_opts = {
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()
super(TopicConsumer, self).__init__(conf, session, callback, node_name,
{}, name or topic, link_opts)
class FanoutConsumer(ConsumerBase):
@ -230,12 +274,22 @@ class FanoutConsumer(ConsumerBase):
"""
self.conf = conf
super(FanoutConsumer, self).__init__(
session, callback,
"%s_fanout" % topic,
{"durable": False, "type": "fanout"},
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
link_opts = {"exclusive": True}
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"durable": False, "type": "fanout"}
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
link_name = ""
else:
raise_invalid_topology_version()
super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
link_opts)
def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
@ -253,29 +307,34 @@ class FanoutConsumer(ConsumerBase):
class Publisher(object):
"""Base Publisher class."""
def __init__(self, session, node_name, node_opts=None):
def __init__(self, conf, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
"""
self.sender = None
self.session = session
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": False,
# auto-delete isn't implemented for exchanges in qpid,
# but put in here anyway
"auto-delete": True,
if conf.qpid_topology_version == 1:
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": False,
# auto-delete isn't implemented for exchanges in qpid,
# but put in here anyway
"auto-delete": True,
},
},
},
}
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
}
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
elif conf.qpid_topology_version == 2:
self.address = node_name
else:
raise_invalid_topology_version()
self.reconnect(session)
@ -319,8 +378,18 @@ class DirectPublisher(Publisher):
"""Publisher class for 'direct'."""
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "direct"})
if conf.qpid_topology_version == 1:
node_name = msg_id
node_opts = {"type": "direct"}
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
else:
raise_invalid_topology_version()
super(DirectPublisher, self).__init__(conf, session, node_name,
node_opts)
class TopicPublisher(Publisher):
@ -329,8 +398,15 @@ class TopicPublisher(Publisher):
"""init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic))
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()
super(TopicPublisher, self).__init__(conf, session, node_name)
class FanoutPublisher(Publisher):
@ -338,9 +414,18 @@ class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
super(FanoutPublisher, self).__init__(
session,
"%s_fanout" % topic, {"type": "fanout"})
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"type": "fanout"}
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
else:
raise_invalid_topology_version()
super(FanoutPublisher, self).__init__(conf, session, node_name,
node_opts)
class NotifyPublisher(Publisher):
@ -349,9 +434,17 @@ class NotifyPublisher(Publisher):
"""init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
super(NotifyPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic),
{"durable": True})
node_opts = {"durable": True}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()
super(NotifyPublisher, self).__init__(conf, session, node_name,
node_opts)
class Connection(object):
@ -665,6 +758,7 @@ class Connection(object):
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
wait_for_consumers=not ack_on_error
)
self.proxy_callbacks.append(callback_wrapper)

View File

@ -43,6 +43,29 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _sighup_supported():
return hasattr(signal, 'SIGHUP')
def _is_sighup(signo):
return _sighup_supported() and signo == signal.SIGHUP
def _signo_to_signame(signo):
signals = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}
if _sighup_supported():
signals[signal.SIGHUP] = 'SIGHUP'
return signals[signo]
def _set_signals_handler(handler):
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
if _sighup_supported():
signal.signal(signal.SIGHUP, handler)
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
@ -100,16 +123,11 @@ class SignalExit(SystemExit):
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
_set_signals_handler(signal.SIG_DFL)
raise SignalExit(signo)
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
_set_signals_handler(self._handle_signal)
def _wait_for_exit_or_signal(self):
status = None
@ -121,9 +139,7 @@ class ServiceLauncher(Launcher):
try:
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
@ -144,7 +160,7 @@ class ServiceLauncher(Launcher):
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal()
if signo != signal.SIGHUP:
if not _is_sighup(signo):
return status
self.restart()
@ -167,18 +183,14 @@ class ProcessLauncher(object):
self.handle_signal()
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
_set_signals_handler(self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
_set_signals_handler(signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
@ -200,7 +212,8 @@ class ProcessLauncher(object):
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
signal.signal(signal.SIGHUP, _sighup)
if _sighup_supported():
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
@ -211,9 +224,7 @@ class ProcessLauncher(object):
try:
launcher.wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
@ -269,7 +280,7 @@ class ProcessLauncher(object):
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if signo != signal.SIGHUP:
if not _is_sighup(signo):
break
launcher.restart()
@ -339,11 +350,9 @@ class ProcessLauncher(object):
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
signame = _signo_to_signame(self.sigcaught)
LOG.info(_('Caught %s, stopping children'), signame)
if self.sigcaught != signal.SIGHUP:
if not _is_sighup(self.sigcaught):
break
for pid in self.children:

View File

@ -21,6 +21,7 @@ Time related utilities and helper functions.
import calendar
import datetime
import time
import iso8601
import six
@ -90,6 +91,11 @@ def is_newer_than(after, seconds):
def utcnow_ts():
"""Timestamp version of our utcnow function."""
if utcnow.override_time is None:
# NOTE(kgriffs): This is several times faster
# than going through calendar.timegm(...)
return int(time.time())
return calendar.timegm(utcnow().timetuple())