openstack/common : rebase to latest oslo

Rebased to latest oslo, now at 8837013, note a few interfaces
changed requiring a some tweaks to the heat code

Change-Id: I7dfc634b7c459edebca19dee522b396e3736aecc
Signed-off-by: Steven Hardy <shardy@redhat.com>
This commit is contained in:
Steven Hardy 2013-01-18 11:26:01 +00:00
parent 54f1b7b846
commit 55c90321b4
22 changed files with 562 additions and 439 deletions

View File

@ -637,8 +637,8 @@ Commands:
""" """
deferred_string = version.deferred_version_string(prefix='%prog ') version_string = version.version_string()
oparser = optparse.OptionParser(version=str(deferred_string), oparser = optparse.OptionParser(version=version_string,
usage=usage.strip()) usage=usage.strip())
create_options(oparser) create_options(oparser)
(opts, cmd, args) = parse_options(oparser, sys.argv[1:]) (opts, cmd, args) = parse_options(oparser, sys.argv[1:])

View File

@ -255,8 +255,8 @@ Commands:
metric-put-data Publish data-point for specified metric metric-put-data Publish data-point for specified metric
""" """
deferred_string = version.deferred_version_string(prefix='%prog ') version_string = version.version_string()
oparser = optparse.OptionParser(version=str(deferred_string), oparser = optparse.OptionParser(version=version_string,
usage=usage.strip()) usage=usage.strip())
create_options(oparser) create_options(oparser)
(opts, cmd, args) = parse_options(oparser, sys.argv[1:]) (opts, cmd, args) = parse_options(oparser, sys.argv[1:])

View File

@ -28,6 +28,7 @@ from eventlet.green import socket
from heat.common import wsgi from heat.common import wsgi
from heat.openstack.common import cfg from heat.openstack.common import cfg
from heat.openstack.common import rpc
DEFAULT_PORT = 8000 DEFAULT_PORT = 8000
@ -102,9 +103,6 @@ rpc_opts = [
help='Name of the engine node. ' help='Name of the engine node. '
'This can be an opaque identifier.' 'This can be an opaque identifier.'
'It is not necessarily a hostname, FQDN, or IP address.'), 'It is not necessarily a hostname, FQDN, or IP address.'),
cfg.StrOpt('control_exchange',
default='heat',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.StrOpt('engine_topic', cfg.StrOpt('engine_topic',
default='engine', default='engine',
help='the topic engine nodes listen on')] help='the topic engine nodes listen on')]
@ -113,6 +111,7 @@ rpc_opts = [
def register_api_opts(): def register_api_opts():
cfg.CONF.register_opts(bind_opts) cfg.CONF.register_opts(bind_opts)
cfg.CONF.register_opts(rpc_opts) cfg.CONF.register_opts(rpc_opts)
rpc.set_defaults(control_exchange='heat')
def register_engine_opts(): def register_engine_opts():
@ -120,6 +119,7 @@ def register_engine_opts():
cfg.CONF.register_opts(db_opts) cfg.CONF.register_opts(db_opts)
cfg.CONF.register_opts(service_opts) cfg.CONF.register_opts(service_opts)
cfg.CONF.register_opts(rpc_opts) cfg.CONF.register_opts(rpc_opts)
rpc.set_defaults(control_exchange='heat')
def setup_logging(): def setup_logging():

View File

@ -63,20 +63,18 @@ class EngineService(service.Service):
# stg == "Stack Thread Groups" # stg == "Stack Thread Groups"
self.stg = {} self.stg = {}
def _start_in_thread(self, stack_id, stack_name, func, *args, **kwargs): def _start_in_thread(self, stack_id, func, *args, **kwargs):
if stack_id not in self.stg: if stack_id not in self.stg:
thr_name = '%s-%s' % (stack_name, stack_id) self.stg[stack_id] = threadgroup.ThreadGroup()
self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
self.stg[stack_id].add_thread(func, *args, **kwargs) self.stg[stack_id].add_thread(func, *args, **kwargs)
def _timer_in_thread(self, stack_id, stack_name, func, *args, **kwargs): def _timer_in_thread(self, stack_id, func, *args, **kwargs):
""" """
Define a periodic task, to be run in a separate thread, in the stack Define a periodic task, to be run in a separate thread, in the stack
threadgroups. Periodicity is cfg.CONF.periodic_interval threadgroups. Periodicity is cfg.CONF.periodic_interval
""" """
if stack_id not in self.stg: if stack_id not in self.stg:
thr_name = '%s-%s' % (stack_name, stack_id) self.stg[stack_id] = threadgroup.ThreadGroup()
self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
self.stg[stack_id].add_timer(cfg.CONF.periodic_interval, self.stg[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs) func, *args, **kwargs)
@ -102,9 +100,7 @@ class EngineService(service.Service):
admin_context = context.get_admin_context() admin_context = context.get_admin_context()
stacks = db_api.stack_get_all(admin_context) stacks = db_api.stack_get_all(admin_context)
for s in stacks: for s in stacks:
self._timer_in_thread(s.id, s.name, self._timer_in_thread(s.id, self._periodic_watcher_task, sid=s.id)
self._periodic_watcher_task,
sid=s.id)
@request_context @request_context
def identify_stack(self, context, stack_name): def identify_stack(self, context, stack_name):
@ -214,11 +210,10 @@ class EngineService(service.Service):
stack_id = stack.store() stack_id = stack.store()
self._start_in_thread(stack_id, stack_name, stack.create) self._start_in_thread(stack_id, stack.create)
# Schedule a periodic watcher task for this stack # Schedule a periodic watcher task for this stack
self._timer_in_thread(stack_id, stack_name, self._timer_in_thread(stack_id, self._periodic_watcher_task,
self._periodic_watcher_task,
sid=stack_id) sid=stack_id)
return dict(stack.identifier()) return dict(stack.identifier())
@ -257,9 +252,7 @@ class EngineService(service.Service):
if response: if response:
return {'Description': response} return {'Description': response}
self._start_in_thread(db_stack.id, db_stack.name, self._start_in_thread(db_stack.id, current_stack.update, updated_stack)
current_stack.update,
updated_stack)
return dict(current_stack.identifier()) return dict(current_stack.identifier())

View File

@ -236,10 +236,11 @@ in order to support a common usage pattern in OpenStack::
Positional command line arguments are supported via a 'positional' Opt Positional command line arguments are supported via a 'positional' Opt
constructor argument:: constructor argument::
>>> CONF.register_cli_opt(MultiStrOpt('bar', positional=True)) >>> conf = ConfigOpts()
>>> conf.register_cli_opt(MultiStrOpt('bar', positional=True))
True True
>>> CONF(['a', 'b']) >>> conf(['a', 'b'])
>>> CONF.bar >>> conf.bar
['a', 'b'] ['a', 'b']
It is also possible to use argparse "sub-parsers" to parse additional It is also possible to use argparse "sub-parsers" to parse additional
@ -249,10 +250,11 @@ command line arguments using the SubCommandOpt class:
... list_action = subparsers.add_parser('list') ... list_action = subparsers.add_parser('list')
... list_action.add_argument('id') ... list_action.add_argument('id')
... ...
>>> CONF.register_cli_opt(SubCommandOpt('action', handler=add_parsers)) >>> conf = ConfigOpts()
>>> conf.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
True True
>>> CONF(['list', '10']) >>> conf(args=['list', '10'])
>>> CONF.action.name, CONF.action.id >>> conf.action.name, conf.action.id
('list', '10') ('list', '10')
""" """
@ -480,6 +482,13 @@ def _is_opt_registered(opts, opt):
return False return False
def set_defaults(opts, **kwargs):
for opt in opts:
if opt.dest in kwargs:
opt.default = kwargs[opt.dest]
break
class Opt(object): class Opt(object):
"""Base class for all configuration options. """Base class for all configuration options.
@ -771,7 +780,7 @@ class ListOpt(Opt):
def _get_from_config_parser(self, cparser, section): def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a list from ConfigParser.""" """Retrieve the opt value as a list from ConfigParser."""
return [v.split(',') for v in return [[a.strip() for a in v.split(',')] for v in
self._cparser_get_with_deprecated(cparser, section)] self._cparser_get_with_deprecated(cparser, section)]
def _get_argparse_kwargs(self, group, **kwargs): def _get_argparse_kwargs(self, group, **kwargs):
@ -1728,11 +1737,13 @@ class CommonConfigOpts(ConfigOpts):
BoolOpt('debug', BoolOpt('debug',
short='d', short='d',
default=False, default=False,
help='Print debugging output'), help='Print debugging output (set logging level to '
'DEBUG instead of default WARNING level).'),
BoolOpt('verbose', BoolOpt('verbose',
short='v', short='v',
default=False, default=False,
help='Print more verbose output'), help='Print more verbose output (set logging level to '
'INFO instead of default WARNING level).'),
] ]
logging_cli_opts = [ logging_cli_opts = [
@ -1756,11 +1767,13 @@ class CommonConfigOpts(ConfigOpts):
'Default: %(default)s'), 'Default: %(default)s'),
StrOpt('log-file', StrOpt('log-file',
metavar='PATH', metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. ' help='(Optional) Name of log file to output to. '
'If not set, logging will go to stdout.'), 'If not set, logging will go to stdout.'),
StrOpt('log-dir', StrOpt('log-dir',
deprecated_name='logdir',
help='(Optional) The directory to keep log files in ' help='(Optional) The directory to keep log files in '
'(will be prepended to --logfile)'), '(will be prepended to --log-file)'),
BoolOpt('use-syslog', BoolOpt('use-syslog',
default=False, default=False,
help='Use syslog for logging.'), help='Use syslog for logging.'),

View File

@ -49,19 +49,20 @@ from heat.openstack.common import notifier
log_opts = [ log_opts = [
cfg.StrOpt('logging_context_format_string', cfg.StrOpt('logging_context_format_string',
default='%(asctime)s %(levelname)s %(name)s [%(request_id)s ' default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
'%(user)s %(tenant)s] %(instance)s' '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s', '%(message)s',
help='format string to use for log messages with context'), help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string', cfg.StrOpt('logging_default_format_string',
default='%(asctime)s %(process)d %(levelname)s %(name)s [-]' default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
' %(instance)s%(message)s', '%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'), help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix', cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d', default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'), help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix', cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s %(process)d TRACE %(name)s %(instance)s', default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format'), help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels', cfg.ListOpt('default_log_levels',
default=[ default=[
@ -289,6 +290,12 @@ def setup(product_name):
_setup_logging_from_conf(product_name) _setup_logging_from_conf(product_name)
def set_defaults(logging_context_format_string):
cfg.set_defaults(log_opts,
logging_context_format_string=
logging_context_format_string)
def _find_facility_from_conf(): def _find_facility_from_conf():
facility_names = logging.handlers.SysLogHandler.facility_names facility_names = logging.handlers.SysLogHandler.facility_names
facility = getattr(logging.handlers.SysLogHandler, facility = getattr(logging.handlers.SysLogHandler,
@ -354,10 +361,12 @@ def _setup_logging_from_conf(product_name):
datefmt=datefmt)) datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt)) handler.setFormatter(LegacyFormatter(datefmt=datefmt))
if CONF.verbose or CONF.debug: if CONF.debug:
log_root.setLevel(logging.DEBUG) log_root.setLevel(logging.DEBUG)
else: elif CONF.verbose:
log_root.setLevel(logging.INFO) log_root.setLevel(logging.INFO)
else:
log_root.setLevel(logging.WARNING)
level = logging.NOTSET level = logging.NOTSET
for pair in CONF.default_log_levels: for pair in CONF.default_log_levels:

View File

@ -50,25 +50,26 @@ rpc_opts = [
default=['heat.openstack.common.exception', default=['heat.openstack.common.exception',
'nova.exception', 'nova.exception',
'cinder.exception', 'cinder.exception',
'exceptions',
], ],
help='Modules of exceptions that are permitted to be recreated' help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'), 'upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit', cfg.BoolOpt('fake_rabbit',
default=False, default=False,
help='If passed, use a fake RabbitMQ provider'), help='If passed, use a fake RabbitMQ provider'),
# cfg.StrOpt('control_exchange',
# The following options are not registered here, but are expected to be default='openstack',
# present. The project using this library must register these options with help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
# the configuration so that project-specific defaults may be defined.
#
#cfg.StrOpt('control_exchange',
# default='nova',
# help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
] ]
cfg.CONF.register_opts(rpc_opts) cfg.CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
cfg.set_defaults(rpc_opts,
control_exchange=control_exchange)
def create_connection(new=True): def create_connection(new=True):
"""Create a connection to the message bus used for rpc. """Create a connection to the message bus used for rpc.
@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
def notify(context, topic, msg): def notify(context, topic, msg, envelope=False):
"""Send notification event. """Send notification event.
:param context: Information that identifies the user that has made this :param context: Information that identifies the user that has made this
request. request.
:param topic: The topic to send the notification to. :param topic: The topic to send the notification to.
:param msg: This is a dict of content of event. :param msg: This is a dict of content of event.
:param envelope: Set to True to enable message envelope for notifications.
:returns: None :returns: None
""" """
return _get_impl().notify(cfg.CONF, context, topic, msg) return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup(): def cleanup():

View File

@ -33,7 +33,6 @@ from eventlet import greenpool
from eventlet import pools from eventlet import pools
from eventlet import semaphore from eventlet import semaphore
from heat.openstack.common import cfg
from heat.openstack.common import excutils from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _
from heat.openstack.common import local from heat.openstack.common import local
@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection):
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
ending=False): ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id. """Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple. Failure should be a sys.exc_info() tuple.
@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
""" """
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
if failure: if failure:
failure = rpc_common.serialize_remote_exception(failure) failure = rpc_common.serialize_remote_exception(failure,
log_failure)
try: try:
msg = {'result': reply, 'failure': failure} msg = {'result': reply, 'failure': failure}
@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure} 'failure': failure}
if ending: if ending:
msg['ending'] = True msg['ending'] = True
conn.direct_send(msg_id, msg) conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext): class RpcContext(rpc_common.CommonRpcContext):
@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext):
return self.__class__(**values) return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False, def reply(self, reply=None, failure=None, ending=False,
connection_pool=None): connection_pool=None, log_failure=True):
if self.msg_id: if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
ending) ending, log_failure)
if ending: if ending:
self.msg_id = None self.msg_id = None
@ -282,11 +282,21 @@ class ProxyCallback(object):
ctxt.reply(rval, None, connection_pool=self.connection_pool) ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done. # This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool) ctxt.reply(ending=True, connection_pool=self.connection_pool)
except rpc_common.ClientException as e:
LOG.debug(_('Expected exception during message handling (%s)') %
e._exc_info[1])
ctxt.reply(None, e._exc_info,
connection_pool=self.connection_pool,
log_failure=False)
except Exception: except Exception:
LOG.exception(_('Exception during message handling')) LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(), ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool) connection_pool=self.connection_pool)
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class MulticallWaiter(object): class MulticallWaiter(object):
def __init__(self, conf, connection, timeout): def __init__(self, conf, connection, timeout):
@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
# that will continue to use the connection. When it's done, # that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into # connection.close() will get called which will put it back into
# the pool # the pool
LOG.debug(_('Making asynchronous call on %s ...'), topic) LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id}) msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id)) LOG.debug(_('MSG_ID is %s') % (msg_id))
@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool) conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout) wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg) conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg return wait_msg
@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous cast on %s...'), topic) LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool):
@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...')) LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, msg) conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool): def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False, with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn: server_params=server_params) as conn:
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg, def fanout_cast_to_server(conf, context, server_params, topic, msg,
@ -402,16 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False, with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn: server_params=server_params) as conn:
conn.fanout_send(topic, msg) conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def notify(conf, context, topic, msg, connection_pool): def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
LOG.debug(_('Sending %(event_type)s on %(topic)s'), LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'), dict(event_type=msg.get('event_type'),
topic=topic)) topic=topic))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
if envelope:
msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg) conn.notify_send(topic, msg)
@ -421,7 +433,4 @@ def cleanup(connection_pool):
def get_control_exchange(conf): def get_control_exchange(conf):
try: return conf.control_exchange
return conf.control_exchange
except cfg.NoSuchOptError:
return 'openstack'

View File

@ -18,8 +18,10 @@
# under the License. # under the License.
import copy import copy
import sys
import traceback import traceback
from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils from heat.openstack.common import importutils
from heat.openstack.common import jsonutils from heat.openstack.common import jsonutils
@ -27,9 +29,50 @@ from heat.openstack.common import local
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
'''RPC Envelope Version.
This version number applies to the top level structure of messages sent out.
It does *not* apply to the message payload, which must be versioned
independently. For example, when using rpc APIs, a version number is applied
for changes to the API being exposed over rpc. This version number is handled
in the rpc proxy and dispatcher modules.
This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
The current message format (version 2.0) is very simple. It is:
{
'heat.version': <RPC Envelope Version as a String>,
'heat.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
So, the current message envelope just includes the envelope version. It may
eventually contain additional information, such as a signature for the message
payload.
We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'heat.version'
_MESSAGE_KEY = 'heat.message'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
class RPCException(Exception): class RPCException(Exception):
message = _("An unknown RPC related exception occurred.") message = _("An unknown RPC related exception occurred.")
@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
"this endpoint.") "this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class Connection(object): class Connection(object):
"""A connection, returned by rpc.create_connection(). """A connection, returned by rpc.create_connection().
@ -164,8 +212,12 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data): def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging.""" """Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': ('new_pass',), SANITIZE = {'set_admin_password': [('args', 'new_pass')],
'run_instance': ('admin_password',), } 'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data has_context_token = '_context_auth_token' in msg_data
@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data):
msg_data = copy.deepcopy(msg_data) msg_data = copy.deepcopy(msg_data)
if has_method: if has_method:
method = msg_data['method'] for arg in SANITIZE.get(msg_data['method'], []):
if method in SANITIZE: try:
args_to_sanitize = SANITIZE[method] d = msg_data
for arg in args_to_sanitize: for elem in arg[:-1]:
try: d = d[elem]
msg_data['args'][arg] = "<SANITIZED>" d[arg[-1]] = '<SANITIZED>'
except KeyError: except KeyError, e:
pass LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token: if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>' msg_data['_context_auth_token'] = '<SANITIZED>'
@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data):
return log_func(msg, msg_data) return log_func(msg, msg_data)
def serialize_remote_exception(failure_info): def serialize_remote_exception(failure_info, log_failure=True):
"""Prepares exception data to be sent over rpc. """Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple. Failure_info should be a sys.exc_info() tuple.
@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info):
""" """
tb = traceback.format_exception(*failure_info) tb = traceback.format_exception(*failure_info)
failure = failure_info[1] failure = failure_info[1]
LOG.error(_("Returning exception %s to caller"), unicode(failure)) if log_failure:
LOG.error(tb) LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(tb)
kwargs = {} kwargs = {}
if hasattr(failure, 'kwargs'): if hasattr(failure, 'kwargs'):
@ -309,3 +364,107 @@ class CommonRpcContext(object):
context.values['read_deleted'] = read_deleted context.values['read_deleted'] = read_deleted
return context return context
class ClientException(Exception):
"""This encapsulates some actual exception that is expected to be
hit by an RPC proxy object. Merely instantiating it records the
current exception information, which will be passed back to the
RPC client without exceptional logging."""
def __init__(self):
self._exc_info = sys.exc_info()
def catch_client_exception(exceptions, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception, e:
if type(e) in exceptions:
raise ClientException()
else:
raise
def client_exceptions(*exceptions):
"""Decorator for manager methods that raise expected exceptions.
Marking a Manager method with this decorator allows the declaration
of expected exceptions that the RPC layer should not consider fatal,
and not log as if they were generated in a real error scenario. Note
that this will cause listed exceptions to be wrapped in a
ClientException, which is used internally by the RPC layer."""
def outer(func):
def inner(*args, **kwargs):
return catch_client_exception(exceptions, func, *args, **kwargs)
return inner
return outer
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
version_parts = version.split('.')
imp_version_parts = imp_version.split('.')
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
return False
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
return False
return True
def serialize_msg(raw_msg, force_envelope=False):
if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
_MESSAGE_KEY: jsonutils.dumps(raw_msg)}
return msg
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
#
# Robustness Principle:
# "Be strict in what you send, liberal in what you accept."
#
# At this point we have to do a bit of guessing about what it
# is we just received. Here is the set of possibilities:
#
# 1) We received a dict. This could be 2 things:
#
# a) Inspect it to see if it looks like a standard message envelope.
# If so, great!
#
# b) If it doesn't look like a standard message envelope, it could either
# be a notification, or a message from before we added a message
# envelope (referred to as version 1.0).
# Just return the message as-is.
#
# 2) It's any other non-dict type. Just return it and hope for the best.
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
if not isinstance(msg, dict):
# See #2 above.
return msg
base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not all(map(lambda key: key in msg, base_envelope_keys)):
# See #1.b above.
return msg
# At this point we think we have the message envelope
# format we were expecting. (#1.a above)
if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg

View File

@ -103,21 +103,6 @@ class RpcDispatcher(object):
self.callbacks = callbacks self.callbacks = callbacks
super(RpcDispatcher, self).__init__() super(RpcDispatcher, self).__init__()
@staticmethod
def _is_compatible(mversion, version):
"""Determine whether versions are compatible.
:param mversion: The API version implemented by a callback.
:param version: The API version requested by an incoming message.
"""
version_parts = version.split('.')
mversion_parts = mversion.split('.')
if int(version_parts[0]) != int(mversion_parts[0]): # Major
return False
if int(version_parts[1]) > int(mversion_parts[1]): # Minor
return False
return True
def dispatch(self, ctxt, version, method, **kwargs): def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version. """Dispatch a message based on a requested version.
@ -139,7 +124,8 @@ class RpcDispatcher(object):
rpc_api_version = proxyobj.RPC_API_VERSION rpc_api_version = proxyobj.RPC_API_VERSION
else: else:
rpc_api_version = '1.0' rpc_api_version = '1.0'
is_compatible = self._is_compatible(rpc_api_version, version) is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method): if not hasattr(proxyobj, method):
continue continue

View File

@ -79,6 +79,8 @@ class Consumer(object):
else: else:
res.append(rval) res.append(rval)
done.send(res) done.send(res)
except rpc_common.ClientException as e:
done.send_exception(e._exc_info[1])
except Exception as e: except Exception as e:
done.send_exception(e) done.send_exception(e)
@ -165,7 +167,7 @@ def cast(conf, context, topic, msg):
pass pass
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
check_serialize(msg) check_serialize(msg)

View File

@ -162,7 +162,8 @@ class ConsumerBase(object):
def _callback(raw_message): def _callback(raw_message):
message = self.channel.message_to_python(raw_message) message = self.channel.message_to_python(raw_message)
try: try:
callback(message.payload) msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
message.ack() message.ack()
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase):
# Default options # Default options
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id, exchange = kombu.entity.Exchange(name=msg_id,
type='direct', type='direct',
@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase):
options = {'durable': False, options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf), 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'], durable=options['durable'],
@ -316,7 +317,7 @@ class DirectPublisher(Publisher):
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id, super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options) type='direct', **options)
@ -350,7 +351,7 @@ class FanoutPublisher(Publisher):
""" """
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options) None, type='fanout', **options)
@ -387,6 +388,7 @@ class Connection(object):
def __init__(self, conf, server_params=None): def __init__(self, conf, server_params=None):
self.consumers = [] self.consumers = []
self.consumer_thread = None self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf self.conf = conf
self.max_retries = self.conf.rabbit_max_retries self.max_retries = self.conf.rabbit_max_retries
# Try forever? # Try forever?
@ -469,7 +471,7 @@ class Connection(object):
LOG.info(_("Reconnecting to AMQP server on " LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params) "%(hostname)s:%(port)d") % params)
try: try:
self.connection.close() self.connection.release()
except self.connection_errors: except self.connection_errors:
pass pass
# Setting this in case the next statement fails, though # Setting this in case the next statement fails, though
@ -573,12 +575,14 @@ class Connection(object):
def close(self): def close(self):
"""Close/release this connection""" """Close/release this connection"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.release() self.connection.release()
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again""" """Reset a connection so it can be used again"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.channel.close() self.channel.close()
self.channel = self.connection.channel() self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3 # work around 'memory' transport bug in 1.1.3
@ -644,6 +648,11 @@ class Connection(object):
pass pass
self.consumer_thread = None self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs): def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
@ -719,6 +728,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout: if fanout:
self.declare_fanout_consumer(topic, proxy_cb) self.declare_fanout_consumer(topic, proxy_cb)
@ -730,6 +740,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name) self.declare_topic_consumer(topic, proxy_cb, pool_name)
@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify( return rpc_amqp.notify(
conf, context, topic, msg, conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup(): def cleanup():

View File

@ -22,16 +22,18 @@ import uuid
import eventlet import eventlet
import greenlet import greenlet
import qpid.messaging
import qpid.messaging.exceptions
from heat.openstack.common import cfg from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils from heat.openstack.common import jsonutils
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
from heat.openstack.common.rpc import amqp as rpc_amqp from heat.openstack.common.rpc import amqp as rpc_amqp
from heat.openstack.common.rpc import common as rpc_common from heat.openstack.common.rpc import common as rpc_common
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
qpid_opts = [ qpid_opts = [
@ -124,7 +126,8 @@ class ConsumerBase(object):
"""Fetch the message and pass it to the callback object""" """Fetch the message and pass it to the callback object"""
message = self.receiver.fetch() message = self.receiver.fetch()
try: try:
self.callback(message.content) msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
finally: finally:
@ -274,11 +277,22 @@ class Connection(object):
pool = None pool = None
def __init__(self, conf, server_params=None): def __init__(self, conf, server_params=None):
if not qpid_messaging:
raise ImportError("Failed to import qpid.messaging")
self.session = None self.session = None
self.consumers = {} self.consumers = {}
self.consumer_thread = None self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf self.conf = conf
if server_params and 'hostname' in server_params:
# NOTE(russellb) This enables support for cast_to_server.
server_params['qpid_hosts'] = [
'%s:%d' % (server_params['hostname'],
server_params.get('port', 5672))
]
params = { params = {
'qpid_hosts': self.conf.qpid_hosts, 'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username, 'username': self.conf.qpid_username,
@ -294,7 +308,7 @@ class Connection(object):
def connection_create(self, broker): def connection_create(self, broker):
# Create the connection - this does not open the connection # Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(broker) self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection # Check if flags are set and if so set them for the connection
# before we call open # before we call open
@ -319,7 +333,7 @@ class Connection(object):
if self.connection.opened(): if self.connection.opened():
try: try:
self.connection.close() self.connection.close()
except qpid.messaging.exceptions.ConnectionError: except qpid_exceptions.ConnectionError:
pass pass
attempt = 0 attempt = 0
@ -331,7 +345,7 @@ class Connection(object):
try: try:
self.connection_create(broker) self.connection_create(broker)
self.connection.open() self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e: except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay) msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. " msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict "Sleeping %(delay)s seconds") % msg_dict
@ -358,8 +372,8 @@ class Connection(object):
while True: while True:
try: try:
return method(*args, **kwargs) return method(*args, **kwargs)
except (qpid.messaging.exceptions.Empty, except (qpid_exceptions.Empty,
qpid.messaging.exceptions.ConnectionError), e: qpid_exceptions.ConnectionError), e:
if error_callback: if error_callback:
error_callback(e) error_callback(e)
self.reconnect() self.reconnect()
@ -367,12 +381,14 @@ class Connection(object):
def close(self): def close(self):
"""Close/release this connection""" """Close/release this connection"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.close() self.connection.close()
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again""" """Reset a connection so it can be used again"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close() self.session.close()
self.session = self.connection.session() self.session = self.connection.session()
self.consumers = {} self.consumers = {}
@ -397,7 +413,7 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers""" """Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc): def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty): if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') % LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc)) str(exc))
raise rpc_common.Timeout() raise rpc_common.Timeout()
@ -427,6 +443,11 @@ class Connection(object):
pass pass
self.consumer_thread = None self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg): def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
@ -502,6 +523,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout: if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@ -517,6 +539,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name) name=pool_name)
@ -575,10 +598,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg, return rpc_amqp.notify(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup(): def cleanup():

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import pprint import pprint
import socket import socket
import string import string
@ -22,15 +23,16 @@ import types
import uuid import uuid
import eventlet import eventlet
from eventlet.green import zmq
import greenlet import greenlet
from heat.openstack.common import cfg from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils from heat.openstack.common import importutils
from heat.openstack.common import jsonutils from heat.openstack.common import jsonutils
from heat.openstack.common import processutils as utils
from heat.openstack.common.rpc import common as rpc_common from heat.openstack.common.rpc import common as rpc_common
zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified. # for convenience, are not modified.
pformat = pprint.pformat pformat = pprint.pformat
@ -61,6 +63,10 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1, cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'), help='Number of ZeroMQ contexts, defaults to 1'),
cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
help='Maximum number of ingress messages to locally buffer '
'per topic. Default is unlimited.'),
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'), help='Directory for holding IPC sockets'),
@ -70,9 +76,9 @@ zmq_opts = [
] ]
# These globals are defined in register_opts(conf), CONF = cfg.CONF
# a mandatory initialization call CONF.register_opts(zmq_opts)
CONF = None
ZMQ_CTX = None # ZeroMQ Context, must be global. ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object matchmaker = None # memoized matchmaker object
@ -107,7 +113,7 @@ class ZmqSocket(object):
""" """
def __init__(self, addr, zmq_type, bind=True, subscribe=None): def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.sock = ZMQ_CTX.socket(zmq_type) self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr self.addr = addr
self.type = zmq_type self.type = zmq_type
self.subscriptions = [] self.subscriptions = []
@ -181,11 +187,15 @@ class ZmqSocket(object):
pass pass
self.subscriptions = [] self.subscriptions = []
# Linger -1 prevents lost/dropped messages
try: try:
self.sock.close(linger=-1) # Default is to linger
self.sock.close()
except Exception: except Exception:
pass # While this is a bad thing to happen,
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None self.sock = None
def recv(self): def recv(self):
@ -202,10 +212,14 @@ class ZmqSocket(object):
class ZmqClient(object): class ZmqClient(object):
"""Client for ZMQ sockets.""" """Client for ZMQ sockets."""
def __init__(self, addr, socket_type=zmq.PUSH, bind=False): def __init__(self, addr, socket_type=None, bind=False):
if socket_type is None:
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind) self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'), self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)]) _serialize(data)])
@ -250,7 +264,7 @@ class InternalContext(object):
"""Process a curried message and cast the result to topic.""" """Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict()) LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None) data.setdefault('version', None)
data.setdefault('args', []) data.setdefault('args', {})
try: try:
result = proxy.dispatch( result = proxy.dispatch(
@ -259,7 +273,14 @@ class InternalContext(object):
except greenlet.GreenletExit: except greenlet.GreenletExit:
# ignore these since they are just from shutdowns # ignore these since they are just from shutdowns
pass pass
except rpc_common.ClientException, e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
rpc_common.serialize_remote_exception(e._exc_info,
log_failure=False)}
except Exception: except Exception:
LOG.error(_("Exception during message handling"))
return {'exc': return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())} rpc_common.serialize_remote_exception(sys.exc_info())}
@ -314,7 +335,7 @@ class ConsumerBase(object):
return return
data.setdefault('version', None) data.setdefault('version', None)
data.setdefault('args', []) data.setdefault('args', {})
proxy.dispatch(ctx, data['version'], proxy.dispatch(ctx, data['version'],
data['method'], **data['args']) data['method'], **data['args'])
@ -404,12 +425,6 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf) super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {} self.topic_proxy = {}
ipc_dir = CONF.rpc_zmq_ipc_dir
self.topic_proxy['zmq_replies'] = \
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock): def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir ipc_dir = CONF.rpc_zmq_ipc_dir
@ -426,7 +441,7 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB sock_type = zmq.PUB
elif topic.startswith('zmq_replies'): elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB sock_type = zmq.PUB
inside = _deserialize(in_msg) inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id'] msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response'] response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response) LOG.debug(_("->response->%s"), response)
@ -435,20 +450,81 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUSH sock_type = zmq.PUSH
if not topic in self.topic_proxy: if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), def publisher(waiter):
sock_type, bind=True) LOG.info(_("Creating proxy for topic: %s"), topic)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
# It takes some time for a pub socket to open, try:
# before we can have any faith in doing a send() to it. out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
if sock_type == zmq.PUB: (ipc_dir, topic),
eventlet.sleep(.5) sock_type, bind=True)
except RPCException:
waiter.send_exception(*sys.exc_info())
return
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic] = eventlet.queue.LightQueue(
self.topic_proxy[topic].send(data) CONF.rpc_zmq_topic_backlog)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) self.sockets.append(out_sock)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
waiter.send(True)
while(True):
data = self.topic_proxy[topic].get()
out_sock.send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
{'data': data})
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
try:
wait_sock_creation.wait()
except RPCException:
LOG.error(_("Topic socket file creation failed."))
return
try:
self.topic_proxy[topic].put_nowait(data)
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
{'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
"""Runs the ZmqProxy service"""
ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None)
if not os.path.isdir(ipc_dir):
try:
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
raise
try:
self.register(consumption_proxy,
consume_in,
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
raise
super(ZmqProxy, self).consume_in_thread()
class ZmqReactor(ZmqBaseReactor): class ZmqReactor(ZmqBaseReactor):
@ -473,7 +549,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg) ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx) ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock] proxy = self.proxies[sock]
@ -524,7 +600,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread() self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None): def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg] payload = [RpcContext.marshal(context), msg]
@ -533,7 +610,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr) conn = ZmqClient(addr)
# assumes cast can't return an exception # assumes cast can't return an exception
conn.cast(msg_id, topic, payload) conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError: except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception") raise RPCException("Cast failed. ZMQ Socket Exception")
finally: finally:
@ -541,7 +618,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn.close() conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None): def _call(addr, context, msg_id, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response # timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout timeout = timeout or CONF.rpc_response_timeout
@ -576,7 +654,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
) )
LOG.debug(_("Sending cast")) LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload) _cast(addr, context, msg_id, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply")) LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply # Blocks until receives reply
@ -602,7 +681,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1] return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None): def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False):
""" """
Wraps the sending of messages, Wraps the sending of messages,
dispatches to the matchmaker and sends dispatches to the matchmaker and sends
@ -611,7 +691,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
conf = CONF conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = matchmaker.queues(topic) queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues) LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results # Don't stack if we have no matchmaker results
@ -628,9 +708,11 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast': if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context, eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout) _topic, _topic, msg, timeout, serialize,
force_envelope)
return return
return method(_addr, context, _topic, _topic, msg, timeout) return method(_addr, context, _topic, _topic, msg, timeout,
serialize, force_envelope)
def create_connection(conf, new=True): def create_connection(conf, new=True):
@ -669,38 +751,37 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not # NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions. # work with our assumptions.
topic.replace('.', '-') topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs) cast(conf, context, topic, msg, **kwargs)
def cleanup(): def cleanup():
"""Clean up resources in use by implementation.""" """Clean up resources in use by implementation."""
global ZMQ_CTX global ZMQ_CTX
global matchmaker if ZMQ_CTX:
matchmaker = None ZMQ_CTX.term()
ZMQ_CTX.term()
ZMQ_CTX = None ZMQ_CTX = None
def register_opts(conf):
"""Registration of options for this driver."""
#NOTE(ewindisch): ZMQ_CTX and matchmaker
# are initialized here as this is as good
# an initialization method as any.
# We memoize through these globals
global ZMQ_CTX
global matchmaker global matchmaker
global CONF matchmaker = None
if not CONF:
conf.register_opts(zmq_opts) def _get_ctxt():
CONF = conf if not zmq:
# Don't re-set, if this method is called twice. raise ImportError("Failed to import eventlet.green.zmq")
global ZMQ_CTX
if not ZMQ_CTX: if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
return ZMQ_CTX
def _get_matchmaker():
global matchmaker
if not matchmaker: if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class' # rpc_zmq_matchmaker should be set to a 'module.Class'
mm_path = conf.rpc_zmq_matchmaker.split('.') mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1]) mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1] mm_class = mm_path[-1]
@ -713,6 +794,4 @@ def register_opts(conf):
mm_impl = importutils.import_module(mm_module) mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class) mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor() matchmaker = mm_constructor()
return matchmaker
register_opts(cfg.CONF)

View File

@ -51,7 +51,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services = threadgroup.ThreadGroup('launcher') self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled() eventlet_backdoor.initialize_if_enabled()
@staticmethod @staticmethod
@ -310,7 +310,7 @@ class Service(object):
"""Service object for binaries running on hosts.""" """Service object for binaries running on hosts."""
def __init__(self, threads=1000): def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup('service', threads) self.tg = threadgroup.ThreadGroup(threads)
def start(self): def start(self):
pass pass

View File

@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC. # Copyright 2011 OpenStack LLC.
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -19,7 +20,7 @@
Utilities with minimum-depends for use in setup.py Utilities with minimum-depends for use in setup.py
""" """
import datetime import email
import os import os
import re import re
import subprocess import subprocess
@ -33,11 +34,12 @@ def parse_mailmap(mailmap='.mailmap'):
if os.path.exists(mailmap): if os.path.exists(mailmap):
with open(mailmap, 'r') as fp: with open(mailmap, 'r') as fp:
for l in fp: for l in fp:
l = l.strip() try:
if not l.startswith('#') and ' ' in l: canonical_email, alias = re.match(
canonical_email, alias = [x for x in l.split(' ') r'[^#]*?(<.+>).*(<.+>).*', l).groups()
if x.startswith('<')] except AttributeError:
mapping[alias] = canonical_email continue
mapping[alias] = canonical_email
return mapping return mapping
@ -45,8 +47,8 @@ def canonicalize_emails(changelog, mapping):
"""Takes in a string and an email alias mapping and replaces all """Takes in a string and an email alias mapping and replaces all
instances of the aliases in the string with their real email. instances of the aliases in the string with their real email.
""" """
for alias, email in mapping.iteritems(): for alias, email_address in mapping.iteritems():
changelog = changelog.replace(alias, email) changelog = changelog.replace(alias, email_address)
return changelog return changelog
@ -106,16 +108,6 @@ def parse_dependency_links(requirements_files=['requirements.txt',
return dependency_links return dependency_links
def write_requirements():
venv = os.environ.get('VIRTUAL_ENV', None)
if venv is not None:
with open("requirements.txt", "w") as req_file:
output = subprocess.Popen(["pip", "-E", venv, "freeze", "-l"],
stdout=subprocess.PIPE)
requirements = output.communicate()[0].strip()
req_file.write(requirements)
def _run_shell_command(cmd): def _run_shell_command(cmd):
if os.name == 'nt': if os.name == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd], output = subprocess.Popen(["cmd.exe", "/C", cmd],
@ -131,57 +123,6 @@ def _run_shell_command(cmd):
return out[0].strip() return out[0].strip()
def _get_git_next_version_suffix(branch_name):
datestamp = datetime.datetime.now().strftime('%Y%m%d')
if branch_name == 'milestone-proposed':
revno_prefix = "r"
else:
revno_prefix = ""
_run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
milestone_cmd = "git show meta/openstack/release:%s" % branch_name
milestonever = _run_shell_command(milestone_cmd)
if milestonever:
first_half = "%s~%s" % (milestonever, datestamp)
else:
first_half = datestamp
post_version = _get_git_post_version()
# post version should look like:
# 0.1.1.4.gcc9e28a
# where the bit after the last . is the short sha, and the bit between
# the last and second to last is the revno count
(revno, sha) = post_version.split(".")[-2:]
second_half = "%s%s.%s" % (revno_prefix, revno, sha)
return ".".join((first_half, second_half))
def _get_git_current_tag():
return _run_shell_command("git tag --contains HEAD")
def _get_git_tag_info():
return _run_shell_command("git describe --tags")
def _get_git_post_version():
current_tag = _get_git_current_tag()
if current_tag is not None:
return current_tag
else:
tag_info = _get_git_tag_info()
if tag_info is None:
base_version = "0.0"
cmd = "git --no-pager log --oneline"
out = _run_shell_command(cmd)
revno = len(out.split("\n"))
sha = _run_shell_command("git describe --always")
else:
tag_infos = tag_info.split("-")
base_version = "-".join(tag_infos[:-2])
(revno, sha) = tag_infos[-2:]
return "%s.%s.%s" % (base_version, revno, sha)
def write_git_changelog(): def write_git_changelog():
"""Write a changelog based on the git changelog.""" """Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog' new_changelog = 'ChangeLog'
@ -227,26 +168,6 @@ _rst_template = """%(heading)s
""" """
def read_versioninfo(project):
"""Read the versioninfo file. If it doesn't exist, we're in a github
zipball, and there's really no way to know what version we really
are, but that should be ok, because the utility of that should be
just about nil if this code path is in use in the first place."""
versioninfo_path = os.path.join(project, 'versioninfo')
if os.path.exists(versioninfo_path):
with open(versioninfo_path, 'r') as vinfo:
version = vinfo.read().strip()
else:
version = "0.0.0"
return version
def write_versioninfo(project, version):
"""Write a simple file containing the version of the package."""
with open(os.path.join(project, 'versioninfo'), 'w') as fil:
fil.write("%s\n" % version)
def get_cmdclass(): def get_cmdclass():
"""Return dict of commands to run from setup.py.""" """Return dict of commands to run from setup.py."""
@ -276,6 +197,9 @@ def get_cmdclass():
from sphinx.setup_command import BuildDoc from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc): class LocalBuildDoc(BuildDoc):
builders = ['html', 'man']
def generate_autoindex(self): def generate_autoindex(self):
print "**Autodocumenting from %s" % os.path.abspath(os.curdir) print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
modules = {} modules = {}
@ -311,56 +235,69 @@ def get_cmdclass():
if not os.getenv('SPHINX_DEBUG'): if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex() self.generate_autoindex()
for builder in ['html', 'man']: for builder in self.builders:
self.builder = builder self.builder = builder
self.finalize_options() self.finalize_options()
self.project = self.distribution.get_name() self.project = self.distribution.get_name()
self.version = self.distribution.get_version() self.version = self.distribution.get_version()
self.release = self.distribution.get_version() self.release = self.distribution.get_version()
BuildDoc.run(self) BuildDoc.run(self)
class LocalBuildLatex(LocalBuildDoc):
builders = ['latex']
cmdclass['build_sphinx'] = LocalBuildDoc cmdclass['build_sphinx'] = LocalBuildDoc
cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError: except ImportError:
pass pass
return cmdclass return cmdclass
def get_git_branchname(): def get_version_from_git():
for branch in _run_shell_command("git branch --color=never").split("\n"):
if branch.startswith('*'):
_branch_name = branch.split()[1].strip()
if _branch_name == "(no":
_branch_name = "no-branch"
return _branch_name
def get_pre_version(projectname, base_version):
"""Return a version which is leading up to a version that will
be released in the future."""
if os.path.isdir('.git'):
current_tag = _get_git_current_tag()
if current_tag is not None:
version = current_tag
else:
branch_name = os.getenv('BRANCHNAME',
os.getenv('GERRIT_REFNAME',
get_git_branchname()))
version_suffix = _get_git_next_version_suffix(branch_name)
version = "%s~%s" % (base_version, version_suffix)
write_versioninfo(projectname, version)
return version
else:
version = read_versioninfo(projectname)
return version
def get_post_version(projectname):
"""Return a version which is equal to the tag that's on the current """Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions revision if there is one, or tag plus number of additional revisions
if the current revision has no tag.""" if the current revision has no tag."""
if os.path.isdir('.git'): if os.path.isdir('.git'):
version = _get_git_post_version() return _run_shell_command(
write_versioninfo(projectname, version) "git describe --always").replace('-', '.')
return None
def get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can."""
try:
pkg_info_file = open('PKG-INFO', 'r')
except (IOError, OSError):
return None
try:
pkg_info = email.message_from_file(pkg_info_file)
except email.MessageError:
return None
# Check to make sure we're in our own dir
if pkg_info.get('Name', None) != package_name:
return None
return pkg_info.get('Version', None)
def get_version(package_name):
"""Get the version of the project. First, try getting it from PKG-INFO, if
it exists. If it does, that means we're in a distribution tarball or that
install has happened. Otherwise, if there is no PKG-INFO file, pull the
version from git.
We do not support setup.py version sanity in git archive tarballs, nor do
we support packagers directly sucking our git repo into theirs. We expect
that a source tarball be made from our git repo - or that if someone wants
to make a source tarball from a fork of our repo with additional tags in it
that they understand and desire the results of doing that.
"""
version = get_version_from_pkg_info(package_name)
if version:
return version return version
return read_versioninfo(projectname) version = get_version_from_git()
if version:
return version
raise Exception("Versioning for this project requires either an sdist"
" tarball, or access to an upstream git repository.")

View File

@ -38,8 +38,7 @@ class Thread(object):
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list. it has done so it can be removed from the threads list.
""" """
def __init__(self, name, thread, group): def __init__(self, thread, group):
self.name = name
self.thread = thread self.thread = thread
self.thread.link(_thread_done, group=group, thread=self) self.thread.link(_thread_done, group=group, thread=self)
@ -57,8 +56,7 @@ class ThreadGroup(object):
when need be). when need be).
* provide an easy API to add timers. * provide an easy API to add timers.
""" """
def __init__(self, name, thread_pool_size=10): def __init__(self, thread_pool_size=10):
self.name = name
self.pool = greenpool.GreenPool(thread_pool_size) self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = [] self.threads = []
self.timers = [] self.timers = []
@ -72,7 +70,7 @@ class ThreadGroup(object):
def add_thread(self, callback, *args, **kwargs): def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs) gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(callback.__name__, gt, self) th = Thread(gt, self)
self.threads.append(th) self.threads.append(th)
def thread_done(self, thread): def thread_done(self, thread):

View File

@ -71,11 +71,15 @@ def normalize_time(timestamp):
def is_older_than(before, seconds): def is_older_than(before, seconds):
"""Return True if before is older than seconds.""" """Return True if before is older than seconds."""
if isinstance(before, basestring):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds) return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds): def is_newer_than(after, seconds):
"""Return True if after is newer than seconds.""" """Return True if after is newer than seconds."""
if isinstance(after, basestring):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds) return after - utcnow() > datetime.timedelta(seconds=seconds)

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC # Copyright 2012 OpenStack LLC
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -15,154 +15,51 @@
# under the License. # under the License.
""" """
Utilities for consuming the auto-generated versioninfo files. Utilities for consuming the version from pkg_resources.
""" """
import datetime
import pkg_resources import pkg_resources
import setup
class _deferred_version_string(str):
"""Internal helper class which provides delayed version calculation."""
def __new__(cls, version_info, prefix):
new_obj = str.__new__(cls, "")
new_obj._version_info = version_info
new_obj._prefix = prefix
new_obj._cached_version = None
return new_obj
def _get_cached_version(self):
if not self._cached_version:
self._cached_version = \
"%s%s" % (self._prefix,
self._version_info.version_string())
return self._cached_version
def __len__(self):
return self._get_cached_version().__len__()
def __contains__(self, item):
return self._get_cached_version().__contains__(item)
def __getslice__(self, i, j):
return self._get_cached_version().__getslice__(i, j)
def __str__(self):
return self._get_cached_version()
def __repr__(self):
return self._get_cached_version()
class VersionInfo(object): class VersionInfo(object):
def __init__(self, package, python_package=None, pre_version=None): def __init__(self, package):
"""Object that understands versioning for a package """Object that understands versioning for a package
:param package: name of the top level python namespace. For glance, :param package: name of the python package, such as glance, or
this would be "glance" for python-glanceclient, it python-glanceclient
would be "glanceclient"
:param python_package: optional name of the project name. For
glance this can be left unset. For
python-glanceclient, this would be
"python-glanceclient"
:param pre_version: optional version that the project is working to
""" """
self.package = package self.package = package
if python_package is None:
self.python_package = package
else:
self.python_package = python_package
self.pre_version = pre_version
self.version = None self.version = None
self._cached_version = None
def _generate_version(self): def _get_version_from_pkg_resources(self):
"""Defer to the openstack.common.setup routines for making a """Get the version of the package from the pkg_resources record
version from git.""" associated with the package."""
if self.pre_version is None: requirement = pkg_resources.Requirement.parse(self.package)
return setup.get_post_version(self.python_package) provider = pkg_resources.get_provider(requirement)
else: return provider.version
return setup.get_pre_version(self.python_package, self.pre_version)
def _newer_version(self, pending_version): def version_string(self):
"""Check to see if we're working with a stale version or not.
We expect a version string that either looks like:
2012.2~f3~20120708.10.4426392
which is an unreleased version of a pre-version, or:
0.1.1.4.gcc9e28a
which is an unreleased version of a post-version, or:
0.1.1
Which is a release and which should match tag.
For now, if we have a date-embedded version, check to see if it's
old, and if so re-generate. Otherwise, just deal with it.
"""
try:
version_date = int(self.version.split("~")[-1].split('.')[0])
if version_date < int(datetime.date.today().strftime('%Y%m%d')):
return self._generate_version()
else:
return pending_version
except Exception:
return pending_version
def version_string_with_vcs(self, always=False):
"""Return the full version of the package including suffixes indicating """Return the full version of the package including suffixes indicating
VCS status. VCS status.
For instance, if we are working towards the 2012.2 release,
canonical_version_string should return 2012.2 if this is a final
release, or else something like 2012.2~f1~20120705.20 if it's not.
:param always: if true, skip all version caching
""" """
if always:
self.version = self._generate_version()
if self.version is None: if self.version is None:
self.version = self._get_version_from_pkg_resources()
requirement = pkg_resources.Requirement.parse(self.python_package)
versioninfo = "%s/versioninfo" % self.package
try:
raw_version = pkg_resources.resource_string(requirement,
versioninfo)
self.version = self._newer_version(raw_version.strip())
except (IOError, pkg_resources.DistributionNotFound):
self.version = self._generate_version()
return self.version return self.version
def canonical_version_string(self, always=False): # Compatibility functions
"""Return the simple version of the package excluding any suffixes. canonical_version_string = version_string
version_string_with_vcs = version_string
For instance, if we are working towards the 2012.2 release, def cached_version_string(self, prefix=""):
canonical_version_string should return 2012.2 in all cases.
:param always: if true, skip all version caching
"""
return self.version_string_with_vcs(always).split('~')[0]
def version_string(self, always=False):
"""Return the base version of the package.
For instance, if we are working towards the 2012.2 release,
version_string should return 2012.2 if this is a final release, or
2012.2-dev if it is not.
:param always: if true, skip all version caching
"""
version_parts = self.version_string_with_vcs(always).split('~')
if len(version_parts) == 1:
return version_parts[0]
else:
return '%s-dev' % (version_parts[0],)
def deferred_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to """Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested rather only do the calculation when and if a version is requested
""" """
return _deferred_version_string(self, prefix) if not self._cached_version:
self._cached_version = "%s%s" % (prefix,
self.version_string())
return self._cached_version

View File

@ -172,8 +172,7 @@ class stackServiceCreateUpdateDeleteTest(unittest.TestCase):
stack.validate().AndReturn(None) stack.validate().AndReturn(None)
self.m.StubOutWithMock(threadgroup, 'ThreadGroup') self.m.StubOutWithMock(threadgroup, 'ThreadGroup')
name_match = mox.StrContains(stack.name) threadgroup.ThreadGroup().AndReturn(DummyThreadGroup())
threadgroup.ThreadGroup(name_match).AndReturn(DummyThreadGroup())
self.m.ReplayAll() self.m.ReplayAll()

View File

@ -16,5 +16,4 @@
from heat.openstack.common import version as common_version from heat.openstack.common import version as common_version
NEXT_VERSION = '2013.1' version_info = common_version.VersionInfo('heat')
version_info = common_version.VersionInfo('heat', pre_version=NEXT_VERSION)

View File

@ -16,13 +16,14 @@
import setuptools import setuptools
from heat.openstack.common import setup from heat.openstack.common import setup
from heat.version import version_info as version
requires = setup.parse_requirements() requires = setup.parse_requirements()
project = 'heat'
setuptools.setup( setuptools.setup(
name='heat', name=project,
version=version.canonical_version_string(always=True), version=setup.get_version(project),
description='The heat project provides services for provisioning ' description='The heat project provides services for provisioning '
'virtual machines', 'virtual machines',
license='Apache License (2.0)', license='Apache License (2.0)',