Update openstack.common.

The following things got updated mainly:
- notifier: removed deprecated grizzly features.
- rpc: enabled message envelope.

Change-Id: I830f15363d21bcee6e93771f9457408d2adff2d8
This commit is contained in:
Lianhao Lu
2013-04-07 10:51:28 +08:00
parent b4097dfc2a
commit b9b1834cc0
9 changed files with 62 additions and 30 deletions

View File

@@ -46,5 +46,5 @@ def install(domain):
NOVA_LOCALEDIR).
"""
gettext.install(domain,
os.environ.get(domain.upper() + '_LOCALEDIR'),
localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
unicode=True)

View File

@@ -30,7 +30,6 @@ LOG = logging.getLogger(__name__)
notifier_opts = [
cfg.MultiStrOpt('notification_driver',
default=[],
deprecated_name='list_notifier_drivers',
help='Driver or drivers to handle sending notifications'),
cfg.StrOpt('default_notification_level',
default='INFO',

View File

@@ -495,7 +495,6 @@ class MulticallProxyWaiter(object):
data = self._dataqueue.get(timeout=self._timeout)
result = self._process_data(data)
except queue.Empty:
LOG.exception(_('Timed out waiting for RPC response.'))
self.done()
raise rpc_common.Timeout()
except Exception:
@@ -662,7 +661,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
msg = rpc_common.serialize_msg(msg, force_envelope=True)
msg = rpc_common.serialize_msg(msg)
conn.notify_send(topic, msg)

View File

@@ -70,10 +70,6 @@ _VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@@ -122,7 +118,25 @@ class Timeout(RPCException):
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
message = _("Timeout while waiting on RPC response.")
message = _('Timeout while waiting on RPC response - '
'topic: "%(topic)s", RPC method: "%(method)s" '
'info: "%(info)s"')
def __init__(self, info=None, topic=None, method=None):
"""
:param info: Extra info to convey to the user
:param topic: The topic that the rpc call was sent to
:param rpc_method_name: The name of the rpc method being
called
"""
self.info = info
self.topic = topic
self.method = method
super(Timeout, self).__init__(
None,
info=info or _('<unknown>'),
topic=topic or _('<unknown>'),
method=method or _('<unknown>'))
class DuplicateMessageError(RPCException):
@@ -325,7 +339,7 @@ def deserialize_remote_exception(conf, data):
if not issubclass(klass, Exception):
raise TypeError("Can only deserialize Exceptions")
failure = klass(**failure.get('kwargs', {}))
failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
except (AttributeError, TypeError, ImportError):
return RemoteError(name, failure.get('message'), trace)
@@ -441,10 +455,7 @@ def version_is_compatible(imp_version, version):
return True
def serialize_msg(raw_msg, force_envelope=False):
if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
def serialize_msg(raw_msg):
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,

View File

@@ -40,8 +40,8 @@ qpid_opts = [
cfg.StrOpt('qpid_hostname',
default='localhost',
help='Qpid broker hostname'),
cfg.StrOpt('qpid_port',
default='5672',
cfg.IntOpt('qpid_port',
default=5672,
help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
@@ -320,7 +320,7 @@ class Connection(object):
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.transport = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
def _register_consumer(self, consumer):

View File

@@ -221,7 +221,7 @@ class ZmqClient(object):
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
if not envelope:
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
return
@@ -295,11 +295,16 @@ class InternalContext(object):
def reply(self, ctx, proxy,
msg_id=None, context=None, topic=None, msg=None):
"""Reply to a casted call."""
# Our real method is curried into msg['args']
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
# this may be able to be removed earlier than
# 'I' if ConsumerBase.process were refactored.
if type(msg) is list:
payload = msg[-1]
else:
payload = msg
child_ctx = RpcContext.unmarshal(msg[0])
response = ConsumerBase.normalize_reply(
self._get_response(child_ctx, proxy, topic, msg[1]),
self._get_response(ctx, proxy, topic, payload),
ctx.replies)
LOG.debug(_("Sending reply"))
@@ -685,8 +690,8 @@ def _call(addr, context, topic, msg, timeout=None,
'method': '-reply',
'args': {
'msg_id': msg_id,
'context': mcontext,
'topic': reply_topic,
# TODO(ewindisch): safe to remove mcontext in I.
'msg': [mcontext, msg]
}
}

View File

@@ -35,10 +35,10 @@ matchmaker_opts = [
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq',
default='300',
default=300,
help='Heartbeat frequency'),
cfg.IntOpt('matchmaker_heartbeat_ttl',
default='600',
default=600,
help='Heartbeat time-to-live.'),
]

View File

@@ -68,16 +68,21 @@ class RpcProxy(object):
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
:param version: (Optional) Override the requested API version in this
message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
:param version: (Optional) Override the requested API version in this
message.
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
return rpc.call(context, self._get_topic(topic), msg, timeout)
real_topic = self._get_topic(topic)
try:
return rpc.call(context, real_topic, msg, timeout)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
def multicall(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.multicall() a remote method.
@@ -85,17 +90,22 @@ class RpcProxy(object):
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
:param version: (Optional) Override the requested API version in this
message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
:param version: (Optional) Override the requested API version in this
message.
:returns: An iterator that lets you process each of the returned values
from the remote method as they arrive.
"""
self._set_version(msg, version)
return rpc.multicall(context, self._get_topic(topic), msg, timeout)
real_topic = self._get_topic(topic)
try:
return rpc.multicall(context, real_topic, msg, timeout)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
def cast(self, context, msg, topic=None, version=None):
"""rpc.cast() a remote method.

View File

@@ -171,6 +171,14 @@ def generate_authors():
" log --format='%aN <%aE>' | sort -u | "
"egrep -v '" + jenkins_email + "'")
changelog = _run_shell_command(git_log_cmd)
signed_cmd = ("git --git-dir=" + git_dir +
" log | grep -i Co-authored-by: | sort -u")
signed_entries = _run_shell_command(signed_cmd)
if signed_entries:
new_entries = "\n".join(
[signed.split(":", 1)[1].strip()
for signed in signed_entries.split("\n") if signed])
changelog = "\n".join((changelog, new_entries))
mailmap = _parse_git_mailmap(git_dir)
with open(new_authors, 'w') as new_authors_fh:
new_authors_fh.write(canonicalize_emails(changelog, mailmap))