Update common code to support pep 1.3.
bug 1014216 Change-Id: I3f8fa2e11c9d3f3d34fb20f65ce886bb9c94463d
This commit is contained in:
@@ -391,7 +391,7 @@ def _get_config_dirs(project=None):
|
|||||||
fix_path('~'),
|
fix_path('~'),
|
||||||
os.path.join('/etc', project) if project else None,
|
os.path.join('/etc', project) if project else None,
|
||||||
'/etc'
|
'/etc'
|
||||||
]
|
]
|
||||||
|
|
||||||
return filter(bool, cfg_dirs)
|
return filter(bool, cfg_dirs)
|
||||||
|
|
||||||
@@ -559,10 +559,10 @@ class Opt(object):
|
|||||||
kwargs = self._get_optparse_kwargs(group)
|
kwargs = self._get_optparse_kwargs(group)
|
||||||
prefix = self._get_optparse_prefix('', group)
|
prefix = self._get_optparse_prefix('', group)
|
||||||
self._add_to_optparse(container, self.name, self.short, kwargs, prefix,
|
self._add_to_optparse(container, self.name, self.short, kwargs, prefix,
|
||||||
self.deprecated_name)
|
self.deprecated_name)
|
||||||
|
|
||||||
def _add_to_optparse(self, container, name, short, kwargs, prefix='',
|
def _add_to_optparse(self, container, name, short, kwargs, prefix='',
|
||||||
deprecated_name=None):
|
deprecated_name=None):
|
||||||
"""Add an option to an optparse parser or group.
|
"""Add an option to an optparse parser or group.
|
||||||
|
|
||||||
:param container: an optparse.OptionContainer object
|
:param container: an optparse.OptionContainer object
|
||||||
@@ -607,11 +607,9 @@ class Opt(object):
|
|||||||
dest = self.dest
|
dest = self.dest
|
||||||
if group is not None:
|
if group is not None:
|
||||||
dest = group.name + '_' + dest
|
dest = group.name + '_' + dest
|
||||||
kwargs.update({
|
kwargs.update({'dest': dest,
|
||||||
'dest': dest,
|
'metavar': self.metavar,
|
||||||
'metavar': self.metavar,
|
'help': self.help, })
|
||||||
'help': self.help,
|
|
||||||
})
|
|
||||||
return kwargs
|
return kwargs
|
||||||
|
|
||||||
def _get_optparse_prefix(self, prefix, group):
|
def _get_optparse_prefix(self, prefix, group):
|
||||||
@@ -676,7 +674,7 @@ class BoolOpt(Opt):
|
|||||||
prefix = self._get_optparse_prefix('no', group)
|
prefix = self._get_optparse_prefix('no', group)
|
||||||
kwargs["help"] = "The inverse of --" + self.name
|
kwargs["help"] = "The inverse of --" + self.name
|
||||||
self._add_to_optparse(container, self.name, None, kwargs, prefix,
|
self._add_to_optparse(container, self.name, None, kwargs, prefix,
|
||||||
self.deprecated_name)
|
self.deprecated_name)
|
||||||
|
|
||||||
def _get_optparse_kwargs(self, group, action='store_true', **kwargs):
|
def _get_optparse_kwargs(self, group, action='store_true', **kwargs):
|
||||||
"""Extends the base optparse keyword dict for boolean options."""
|
"""Extends the base optparse keyword dict for boolean options."""
|
||||||
@@ -946,13 +944,13 @@ class ConfigOpts(collections.Mapping):
|
|||||||
self._oparser.disable_interspersed_args()
|
self._oparser.disable_interspersed_args()
|
||||||
|
|
||||||
self._config_opts = [
|
self._config_opts = [
|
||||||
MultiStrOpt('config-file',
|
MultiStrOpt('config-file',
|
||||||
default=default_config_files,
|
default=default_config_files,
|
||||||
metavar='PATH',
|
metavar='PATH',
|
||||||
help='Path to a config file to use. Multiple config '
|
help='Path to a config file to use. Multiple config '
|
||||||
'files can be specified, with values in later '
|
'files can be specified, with values in later '
|
||||||
'files taking precedence. The default files '
|
'files taking precedence. The default files '
|
||||||
' used are: %s' % (default_config_files, )),
|
' used are: %s' % (default_config_files, )),
|
||||||
StrOpt('config-dir',
|
StrOpt('config-dir',
|
||||||
metavar='DIR',
|
metavar='DIR',
|
||||||
help='Path to a config directory to pull *.conf '
|
help='Path to a config directory to pull *.conf '
|
||||||
@@ -962,7 +960,7 @@ class ConfigOpts(collections.Mapping):
|
|||||||
'the file(s), if any, specified via --config-file, '
|
'the file(s), if any, specified via --config-file, '
|
||||||
'hence over-ridden options in the directory take '
|
'hence over-ridden options in the directory take '
|
||||||
'precedence.'),
|
'precedence.'),
|
||||||
]
|
]
|
||||||
self.register_cli_opts(self._config_opts)
|
self.register_cli_opts(self._config_opts)
|
||||||
|
|
||||||
self.project = project
|
self.project = project
|
||||||
@@ -1452,8 +1450,7 @@ class ConfigOpts(collections.Mapping):
|
|||||||
default, opt, override = [info[k] for k in sorted(info.keys())]
|
default, opt, override = [info[k] for k in sorted(info.keys())]
|
||||||
|
|
||||||
if opt.required:
|
if opt.required:
|
||||||
if (default is not None or
|
if (default is not None or override is not None):
|
||||||
override is not None):
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if self._get(opt.name, group) is None:
|
if self._get(opt.name, group) is None:
|
||||||
@@ -1557,7 +1554,7 @@ class CommonConfigOpts(ConfigOpts):
|
|||||||
short='v',
|
short='v',
|
||||||
default=False,
|
default=False,
|
||||||
help='Print more verbose output'),
|
help='Print more verbose output'),
|
||||||
]
|
]
|
||||||
|
|
||||||
logging_cli_opts = [
|
logging_cli_opts = [
|
||||||
StrOpt('log-config',
|
StrOpt('log-config',
|
||||||
@@ -1591,7 +1588,7 @@ class CommonConfigOpts(ConfigOpts):
|
|||||||
StrOpt('syslog-log-facility',
|
StrOpt('syslog-log-facility',
|
||||||
default='LOG_USER',
|
default='LOG_USER',
|
||||||
help='syslog facility to receive log lines')
|
help='syslog facility to receive log lines')
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(CommonConfigOpts, self).__init__()
|
super(CommonConfigOpts, self).__init__()
|
||||||
|
|||||||
@@ -99,15 +99,15 @@ def add_log_options(parser):
|
|||||||
"the Python logging module documentation for "
|
"the Python logging module documentation for "
|
||||||
"details on logging configuration files.")
|
"details on logging configuration files.")
|
||||||
group.add_option('--log-date-format', metavar="FORMAT",
|
group.add_option('--log-date-format', metavar="FORMAT",
|
||||||
default=DEFAULT_LOG_DATE_FORMAT,
|
default=DEFAULT_LOG_DATE_FORMAT,
|
||||||
help="Format string for %(asctime)s in log records. "
|
help="Format string for %(asctime)s in log records. "
|
||||||
"Default: %default")
|
"Default: %default")
|
||||||
group.add_option('--log-file', default=None, metavar="PATH",
|
group.add_option('--log-file', default=None, metavar="PATH",
|
||||||
help="(Optional) Name of log file to output to. "
|
help="(Optional) Name of log file to output to. "
|
||||||
"If not set, logging will go to stdout.")
|
"If not set, logging will go to stdout.")
|
||||||
group.add_option("--log-dir", default=None,
|
group.add_option("--log-dir", default=None,
|
||||||
help="(Optional) The directory to keep log files in "
|
help="(Optional) The directory to keep log files in "
|
||||||
"(will be prepended to --logfile)")
|
"(will be prepended to --logfile)")
|
||||||
group.add_option('--use-syslog', default=False, dest="use_syslog",
|
group.add_option('--use-syslog', default=False, dest="use_syslog",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help="Use syslog for logging.")
|
help="Use syslog for logging.")
|
||||||
@@ -249,7 +249,7 @@ def load_paste_config(app_name, options, args, config_dir=None):
|
|||||||
conf_file = find_config_file(app_name, options, args, config_dir)
|
conf_file = find_config_file(app_name, options, args, config_dir)
|
||||||
if not conf_file:
|
if not conf_file:
|
||||||
raise RuntimeError("Unable to locate any configuration file. "
|
raise RuntimeError("Unable to locate any configuration file. "
|
||||||
"Cannot load application %s" % app_name)
|
"Cannot load application %s" % app_name)
|
||||||
try:
|
try:
|
||||||
conf = deploy.appconfig("config:%s" % conf_file, name=app_name)
|
conf = deploy.appconfig("config:%s" % conf_file, name=app_name)
|
||||||
return conf_file, conf
|
return conf_file, conf
|
||||||
|
|||||||
@@ -44,6 +44,6 @@ def save_and_reraise_exception():
|
|||||||
yield
|
yield
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.error('Original exception being dropped: %s' %
|
logging.error('Original exception being dropped: %s' %
|
||||||
(traceback.format_exception(type_, value, tb)))
|
(traceback.format_exception(type_, value, tb)))
|
||||||
raise
|
raise
|
||||||
raise type_, value, tb
|
raise type_, value, tb
|
||||||
|
|||||||
@@ -220,15 +220,15 @@ class ExtensionMiddleware(wsgi.Middleware):
|
|||||||
if not action.collection in action_resources.keys():
|
if not action.collection in action_resources.keys():
|
||||||
resource = ActionExtensionResource(application)
|
resource = ActionExtensionResource(application)
|
||||||
mapper.connect("/%s/:(id)/action.:(format)" %
|
mapper.connect("/%s/:(id)/action.:(format)" %
|
||||||
action.collection,
|
action.collection,
|
||||||
action='action',
|
action='action',
|
||||||
controller=resource,
|
controller=resource,
|
||||||
conditions=dict(method=['POST']))
|
conditions=dict(method=['POST']))
|
||||||
mapper.connect("/%s/:(id)/action" %
|
mapper.connect("/%s/:(id)/action" %
|
||||||
action.collection,
|
action.collection,
|
||||||
action='action',
|
action='action',
|
||||||
controller=resource,
|
controller=resource,
|
||||||
conditions=dict(method=['POST']))
|
conditions=dict(method=['POST']))
|
||||||
action_resources[action.collection] = resource
|
action_resources[action.collection] = resource
|
||||||
|
|
||||||
return action_resources
|
return action_resources
|
||||||
@@ -240,21 +240,20 @@ class ExtensionMiddleware(wsgi.Middleware):
|
|||||||
if not req_ext.key in request_ext_resources.keys():
|
if not req_ext.key in request_ext_resources.keys():
|
||||||
resource = RequestExtensionResource(application)
|
resource = RequestExtensionResource(application)
|
||||||
mapper.connect(req_ext.url_route + '.:(format)',
|
mapper.connect(req_ext.url_route + '.:(format)',
|
||||||
action='process',
|
action='process',
|
||||||
controller=resource,
|
controller=resource,
|
||||||
conditions=req_ext.conditions)
|
conditions=req_ext.conditions)
|
||||||
|
|
||||||
mapper.connect(req_ext.url_route,
|
mapper.connect(req_ext.url_route,
|
||||||
action='process',
|
action='process',
|
||||||
controller=resource,
|
controller=resource,
|
||||||
conditions=req_ext.conditions)
|
conditions=req_ext.conditions)
|
||||||
request_ext_resources[req_ext.key] = resource
|
request_ext_resources[req_ext.key] = resource
|
||||||
|
|
||||||
return request_ext_resources
|
return request_ext_resources
|
||||||
|
|
||||||
def __init__(self, application, config, ext_mgr=None):
|
def __init__(self, application, config, ext_mgr=None):
|
||||||
ext_mgr = ext_mgr or ExtensionManager(
|
ext_mgr = ext_mgr or ExtensionManager(config['api_extensions_path'])
|
||||||
config['api_extensions_path'])
|
|
||||||
mapper = routes.Mapper()
|
mapper = routes.Mapper()
|
||||||
|
|
||||||
# extended resources
|
# extended resources
|
||||||
@@ -275,7 +274,7 @@ class ExtensionMiddleware(wsgi.Middleware):
|
|||||||
|
|
||||||
# extended actions
|
# extended actions
|
||||||
action_resources = self._action_ext_resources(application, ext_mgr,
|
action_resources = self._action_ext_resources(application, ext_mgr,
|
||||||
mapper)
|
mapper)
|
||||||
for action in ext_mgr.get_actions():
|
for action in ext_mgr.get_actions():
|
||||||
LOG.debug(_('Extended action: %s'), action.action_name)
|
LOG.debug(_('Extended action: %s'), action.action_name)
|
||||||
resource = action_resources[action.collection]
|
resource = action_resources[action.collection]
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ def import_class(import_str):
|
|||||||
return getattr(sys.modules[mod_str], class_str)
|
return getattr(sys.modules[mod_str], class_str)
|
||||||
except (ImportError, ValueError, AttributeError), exc:
|
except (ImportError, ValueError, AttributeError), exc:
|
||||||
raise ImportError('Class %s cannot be found (%s)' %
|
raise ImportError('Class %s cannot be found (%s)' %
|
||||||
(class_str, str(exc)))
|
(class_str, str(exc)))
|
||||||
|
|
||||||
|
|
||||||
def import_object(import_str, *args, **kwargs):
|
def import_object(import_str, *args, **kwargs):
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ rpc_opts = [
|
|||||||
cfg.BoolOpt('fake_rabbit',
|
cfg.BoolOpt('fake_rabbit',
|
||||||
default=False,
|
default=False,
|
||||||
help='If passed, use a fake RabbitMQ provider'),
|
help='If passed, use a fake RabbitMQ provider'),
|
||||||
]
|
]
|
||||||
|
|
||||||
cfg.CONF.register_opts(rpc_opts)
|
cfg.CONF.register_opts(rpc_opts)
|
||||||
|
|
||||||
|
|||||||
@@ -92,8 +92,9 @@ class ConnectionContext(rpc_common.Connection):
|
|||||||
if pooled:
|
if pooled:
|
||||||
self.connection = connection_pool.get()
|
self.connection = connection_pool.get()
|
||||||
else:
|
else:
|
||||||
self.connection = connection_pool.connection_cls(conf,
|
self.connection = connection_pool.connection_cls(
|
||||||
server_params=server_params)
|
conf,
|
||||||
|
server_params=server_params)
|
||||||
self.pooled = pooled
|
self.pooled = pooled
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
@@ -161,8 +162,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
|
|||||||
msg = {'result': reply, 'failure': failure}
|
msg = {'result': reply, 'failure': failure}
|
||||||
except TypeError:
|
except TypeError:
|
||||||
msg = {'result': dict((k, repr(v))
|
msg = {'result': dict((k, repr(v))
|
||||||
for k, v in reply.__dict__.iteritems()),
|
for k, v in reply.__dict__.iteritems()),
|
||||||
'failure': failure}
|
'failure': failure}
|
||||||
if ending:
|
if ending:
|
||||||
msg['ending'] = True
|
msg['ending'] = True
|
||||||
conn.direct_send(msg_id, msg)
|
conn.direct_send(msg_id, msg)
|
||||||
@@ -288,8 +289,8 @@ class ProxyCallback(object):
|
|||||||
class MulticallWaiter(object):
|
class MulticallWaiter(object):
|
||||||
def __init__(self, conf, connection, timeout):
|
def __init__(self, conf, connection, timeout):
|
||||||
self._connection = connection
|
self._connection = connection
|
||||||
self._iterator = connection.iterconsume(
|
self._iterator = connection.iterconsume(timeout=timeout or
|
||||||
timeout=timeout or conf.rpc_response_timeout)
|
conf.rpc_response_timeout)
|
||||||
self._result = None
|
self._result = None
|
||||||
self._done = False
|
self._done = False
|
||||||
self._got_ending = False
|
self._got_ending = False
|
||||||
@@ -308,7 +309,7 @@ class MulticallWaiter(object):
|
|||||||
if data['failure']:
|
if data['failure']:
|
||||||
failure = data['failure']
|
failure = data['failure']
|
||||||
self._result = rpc_common.deserialize_remote_exception(self._conf,
|
self._result = rpc_common.deserialize_remote_exception(self._conf,
|
||||||
failure)
|
failure)
|
||||||
|
|
||||||
elif data.get('ending', False):
|
elif data.get('ending', False):
|
||||||
self._got_ending = True
|
self._got_ending = True
|
||||||
@@ -389,16 +390,16 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
|
|||||||
"""Sends a message on a topic to a specific server."""
|
"""Sends a message on a topic to a specific server."""
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool, pooled=False,
|
with ConnectionContext(conf, connection_pool, pooled=False,
|
||||||
server_params=server_params) as conn:
|
server_params=server_params) as conn:
|
||||||
conn.topic_send(topic, msg)
|
conn.topic_send(topic, msg)
|
||||||
|
|
||||||
|
|
||||||
def fanout_cast_to_server(conf, context, server_params, topic, msg,
|
def fanout_cast_to_server(conf, context, server_params, topic, msg,
|
||||||
connection_pool):
|
connection_pool):
|
||||||
"""Sends a message on a fanout exchange to a specific server."""
|
"""Sends a message on a fanout exchange to a specific server."""
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool, pooled=False,
|
with ConnectionContext(conf, connection_pool, pooled=False,
|
||||||
server_params=server_params) as conn:
|
server_params=server_params) as conn:
|
||||||
conn.fanout_send(topic, msg)
|
conn.fanout_send(topic, msg)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -167,10 +167,8 @@ class Connection(object):
|
|||||||
|
|
||||||
def _safe_log(log_func, msg, msg_data):
|
def _safe_log(log_func, msg, msg_data):
|
||||||
"""Sanitizes the msg_data field before logging."""
|
"""Sanitizes the msg_data field before logging."""
|
||||||
SANITIZE = {
|
SANITIZE = {'set_admin_password': ('new_pass',),
|
||||||
'set_admin_password': ('new_pass',),
|
'run_instance': ('admin_password',), }
|
||||||
'run_instance': ('admin_password',),
|
|
||||||
}
|
|
||||||
|
|
||||||
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
|
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
|
||||||
has_context_token = '_context_auth_token' in msg_data
|
has_context_token = '_context_auth_token' in msg_data
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ kombu_opts = [
|
|||||||
cfg.StrOpt('kombu_ssl_ca_certs',
|
cfg.StrOpt('kombu_ssl_ca_certs',
|
||||||
default='',
|
default='',
|
||||||
help=('SSL certification authority file '
|
help=('SSL certification authority file '
|
||||||
'(valid only if SSL enabled)')),
|
'(valid only if SSL enabled)')),
|
||||||
cfg.StrOpt('rabbit_host',
|
cfg.StrOpt('rabbit_host',
|
||||||
default='localhost',
|
default='localhost',
|
||||||
help='the RabbitMQ host'),
|
help='the RabbitMQ host'),
|
||||||
@@ -80,7 +80,7 @@ kombu_opts = [
|
|||||||
default=False,
|
default=False,
|
||||||
help='use durable queues in RabbitMQ'),
|
help='use durable queues in RabbitMQ'),
|
||||||
|
|
||||||
]
|
]
|
||||||
|
|
||||||
cfg.CONF.register_opts(kombu_opts)
|
cfg.CONF.register_opts(kombu_opts)
|
||||||
|
|
||||||
@@ -171,22 +171,20 @@ class DirectConsumer(ConsumerBase):
|
|||||||
"""
|
"""
|
||||||
# Default options
|
# Default options
|
||||||
options = {'durable': False,
|
options = {'durable': False,
|
||||||
'auto_delete': True,
|
'auto_delete': True,
|
||||||
'exclusive': True}
|
'exclusive': True}
|
||||||
options.update(kwargs)
|
options.update(kwargs)
|
||||||
exchange = kombu.entity.Exchange(
|
exchange = kombu.entity.Exchange(name=msg_id,
|
||||||
name=msg_id,
|
type='direct',
|
||||||
type='direct',
|
durable=options['durable'],
|
||||||
durable=options['durable'],
|
auto_delete=options['auto_delete'])
|
||||||
auto_delete=options['auto_delete'])
|
super(DirectConsumer, self).__init__(channel,
|
||||||
super(DirectConsumer, self).__init__(
|
callback,
|
||||||
channel,
|
tag,
|
||||||
callback,
|
name=msg_id,
|
||||||
tag,
|
exchange=exchange,
|
||||||
name=msg_id,
|
routing_key=msg_id,
|
||||||
exchange=exchange,
|
**options)
|
||||||
routing_key=msg_id,
|
|
||||||
**options)
|
|
||||||
|
|
||||||
|
|
||||||
class TopicConsumer(ConsumerBase):
|
class TopicConsumer(ConsumerBase):
|
||||||
@@ -208,22 +206,20 @@ class TopicConsumer(ConsumerBase):
|
|||||||
"""
|
"""
|
||||||
# Default options
|
# Default options
|
||||||
options = {'durable': conf.rabbit_durable_queues,
|
options = {'durable': conf.rabbit_durable_queues,
|
||||||
'auto_delete': False,
|
'auto_delete': False,
|
||||||
'exclusive': False}
|
'exclusive': False}
|
||||||
options.update(kwargs)
|
options.update(kwargs)
|
||||||
exchange = kombu.entity.Exchange(
|
exchange = kombu.entity.Exchange(name=conf.control_exchange,
|
||||||
name=conf.control_exchange,
|
type='topic',
|
||||||
type='topic',
|
durable=options['durable'],
|
||||||
durable=options['durable'],
|
auto_delete=options['auto_delete'])
|
||||||
auto_delete=options['auto_delete'])
|
super(TopicConsumer, self).__init__(channel,
|
||||||
super(TopicConsumer, self).__init__(
|
callback,
|
||||||
channel,
|
tag,
|
||||||
callback,
|
name=name or topic,
|
||||||
tag,
|
exchange=exchange,
|
||||||
name=name or topic,
|
routing_key=topic,
|
||||||
exchange=exchange,
|
**options)
|
||||||
routing_key=topic,
|
|
||||||
**options)
|
|
||||||
|
|
||||||
|
|
||||||
class FanoutConsumer(ConsumerBase):
|
class FanoutConsumer(ConsumerBase):
|
||||||
@@ -245,22 +241,17 @@ class FanoutConsumer(ConsumerBase):
|
|||||||
|
|
||||||
# Default options
|
# Default options
|
||||||
options = {'durable': False,
|
options = {'durable': False,
|
||||||
'auto_delete': True,
|
'auto_delete': True,
|
||||||
'exclusive': True}
|
'exclusive': True}
|
||||||
options.update(kwargs)
|
options.update(kwargs)
|
||||||
exchange = kombu.entity.Exchange(
|
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
|
||||||
name=exchange_name,
|
durable=options['durable'],
|
||||||
type='fanout',
|
auto_delete=options['auto_delete'])
|
||||||
durable=options['durable'],
|
super(FanoutConsumer, self).__init__(channel, callback, tag,
|
||||||
auto_delete=options['auto_delete'])
|
name=queue_name,
|
||||||
super(FanoutConsumer, self).__init__(
|
exchange=exchange,
|
||||||
channel,
|
routing_key=topic,
|
||||||
callback,
|
**options)
|
||||||
tag,
|
|
||||||
name=queue_name,
|
|
||||||
exchange=exchange,
|
|
||||||
routing_key=topic,
|
|
||||||
**options)
|
|
||||||
|
|
||||||
|
|
||||||
class Publisher(object):
|
class Publisher(object):
|
||||||
@@ -278,9 +269,10 @@ class Publisher(object):
|
|||||||
def reconnect(self, channel):
|
def reconnect(self, channel):
|
||||||
"""Re-establish the Producer after a rabbit reconnection"""
|
"""Re-establish the Producer after a rabbit reconnection"""
|
||||||
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
|
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
|
||||||
**self.kwargs)
|
**self.kwargs)
|
||||||
self.producer = kombu.messaging.Producer(exchange=self.exchange,
|
self.producer = kombu.messaging.Producer(exchange=self.exchange,
|
||||||
channel=channel, routing_key=self.routing_key)
|
channel=channel,
|
||||||
|
routing_key=self.routing_key)
|
||||||
|
|
||||||
def send(self, msg):
|
def send(self, msg):
|
||||||
"""Send a message"""
|
"""Send a message"""
|
||||||
@@ -296,14 +288,11 @@ class DirectPublisher(Publisher):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
options = {'durable': False,
|
options = {'durable': False,
|
||||||
'auto_delete': True,
|
'auto_delete': True,
|
||||||
'exclusive': True}
|
'exclusive': True}
|
||||||
options.update(kwargs)
|
options.update(kwargs)
|
||||||
super(DirectPublisher, self).__init__(channel,
|
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
|
||||||
msg_id,
|
type='direct', **options)
|
||||||
msg_id,
|
|
||||||
type='direct',
|
|
||||||
**options)
|
|
||||||
|
|
||||||
|
|
||||||
class TopicPublisher(Publisher):
|
class TopicPublisher(Publisher):
|
||||||
@@ -314,14 +303,11 @@ class TopicPublisher(Publisher):
|
|||||||
Kombu options may be passed as keyword args to override defaults
|
Kombu options may be passed as keyword args to override defaults
|
||||||
"""
|
"""
|
||||||
options = {'durable': conf.rabbit_durable_queues,
|
options = {'durable': conf.rabbit_durable_queues,
|
||||||
'auto_delete': False,
|
'auto_delete': False,
|
||||||
'exclusive': False}
|
'exclusive': False}
|
||||||
options.update(kwargs)
|
options.update(kwargs)
|
||||||
super(TopicPublisher, self).__init__(channel,
|
super(TopicPublisher, self).__init__(channel, conf.control_exchange,
|
||||||
conf.control_exchange,
|
topic, type='topic', **options)
|
||||||
topic,
|
|
||||||
type='topic',
|
|
||||||
**options)
|
|
||||||
|
|
||||||
|
|
||||||
class FanoutPublisher(Publisher):
|
class FanoutPublisher(Publisher):
|
||||||
@@ -332,14 +318,11 @@ class FanoutPublisher(Publisher):
|
|||||||
Kombu options may be passed as keyword args to override defaults
|
Kombu options may be passed as keyword args to override defaults
|
||||||
"""
|
"""
|
||||||
options = {'durable': False,
|
options = {'durable': False,
|
||||||
'auto_delete': True,
|
'auto_delete': True,
|
||||||
'exclusive': True}
|
'exclusive': True}
|
||||||
options.update(kwargs)
|
options.update(kwargs)
|
||||||
super(FanoutPublisher, self).__init__(channel,
|
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
|
||||||
'%s_fanout' % topic,
|
None, type='fanout', **options)
|
||||||
None,
|
|
||||||
type='fanout',
|
|
||||||
**options)
|
|
||||||
|
|
||||||
|
|
||||||
class NotifyPublisher(TopicPublisher):
|
class NotifyPublisher(TopicPublisher):
|
||||||
@@ -356,10 +339,10 @@ class NotifyPublisher(TopicPublisher):
|
|||||||
# we do this to ensure that messages don't get dropped if the
|
# we do this to ensure that messages don't get dropped if the
|
||||||
# consumer is started after we do
|
# consumer is started after we do
|
||||||
queue = kombu.entity.Queue(channel=channel,
|
queue = kombu.entity.Queue(channel=channel,
|
||||||
exchange=self.exchange,
|
exchange=self.exchange,
|
||||||
durable=self.durable,
|
durable=self.durable,
|
||||||
name=self.routing_key,
|
name=self.routing_key,
|
||||||
routing_key=self.routing_key)
|
routing_key=self.routing_key)
|
||||||
queue.declare()
|
queue.declare()
|
||||||
|
|
||||||
|
|
||||||
@@ -445,7 +428,7 @@ class Connection(object):
|
|||||||
"""
|
"""
|
||||||
if self.connection:
|
if self.connection:
|
||||||
LOG.info(_("Reconnecting to AMQP server on "
|
LOG.info(_("Reconnecting to AMQP server on "
|
||||||
"%(hostname)s:%(port)d") % self.params)
|
"%(hostname)s:%(port)d") % self.params)
|
||||||
try:
|
try:
|
||||||
self.connection.close()
|
self.connection.close()
|
||||||
except self.connection_errors:
|
except self.connection_errors:
|
||||||
@@ -453,8 +436,7 @@ class Connection(object):
|
|||||||
# Setting this in case the next statement fails, though
|
# Setting this in case the next statement fails, though
|
||||||
# it shouldn't be doing any network operations, yet.
|
# it shouldn't be doing any network operations, yet.
|
||||||
self.connection = None
|
self.connection = None
|
||||||
self.connection = kombu.connection.BrokerConnection(
|
self.connection = kombu.connection.BrokerConnection(**self.params)
|
||||||
**self.params)
|
|
||||||
self.connection_errors = self.connection.connection_errors
|
self.connection_errors = self.connection.connection_errors
|
||||||
if self.memory_transport:
|
if self.memory_transport:
|
||||||
# Kludge to speed up tests.
|
# Kludge to speed up tests.
|
||||||
@@ -504,8 +486,8 @@ class Connection(object):
|
|||||||
|
|
||||||
if self.max_retries and attempt == self.max_retries:
|
if self.max_retries and attempt == self.max_retries:
|
||||||
LOG.exception(_('Unable to connect to AMQP server on '
|
LOG.exception(_('Unable to connect to AMQP server on '
|
||||||
'%(hostname)s:%(port)d after %(max_retries)d '
|
'%(hostname)s:%(port)d after %(max_retries)d '
|
||||||
'tries: %(err_str)s') % log_info)
|
'tries: %(err_str)s') % log_info)
|
||||||
# NOTE(comstud): Copied from original code. There's
|
# NOTE(comstud): Copied from original code. There's
|
||||||
# really no better recourse because if this was a queue we
|
# really no better recourse because if this was a queue we
|
||||||
# need to consume on, we have no way to consume anymore.
|
# need to consume on, we have no way to consume anymore.
|
||||||
@@ -520,8 +502,8 @@ class Connection(object):
|
|||||||
|
|
||||||
log_info['sleep_time'] = sleep_time
|
log_info['sleep_time'] = sleep_time
|
||||||
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
|
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
|
||||||
' unreachable: %(err_str)s. Trying again in '
|
' unreachable: %(err_str)s. Trying again in '
|
||||||
'%(sleep_time)d seconds.') % log_info)
|
'%(sleep_time)d seconds.') % log_info)
|
||||||
time.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
def ensure(self, error_callback, method, *args, **kwargs):
|
def ensure(self, error_callback, method, *args, **kwargs):
|
||||||
@@ -571,11 +553,11 @@ class Connection(object):
|
|||||||
def _connect_error(exc):
|
def _connect_error(exc):
|
||||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
||||||
"%(err_str)s") % log_info)
|
"%(err_str)s") % log_info)
|
||||||
|
|
||||||
def _declare_consumer():
|
def _declare_consumer():
|
||||||
consumer = consumer_cls(self.conf, self.channel, topic, callback,
|
consumer = consumer_cls(self.conf, self.channel, topic, callback,
|
||||||
self.consumer_num.next())
|
self.consumer_num.next())
|
||||||
self.consumers.append(consumer)
|
self.consumers.append(consumer)
|
||||||
return consumer
|
return consumer
|
||||||
|
|
||||||
@@ -589,11 +571,11 @@ class Connection(object):
|
|||||||
def _error_callback(exc):
|
def _error_callback(exc):
|
||||||
if isinstance(exc, socket.timeout):
|
if isinstance(exc, socket.timeout):
|
||||||
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
||||||
str(exc))
|
str(exc))
|
||||||
raise rpc_common.Timeout()
|
raise rpc_common.Timeout()
|
||||||
else:
|
else:
|
||||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||||
str(exc))
|
str(exc))
|
||||||
info['do_consume'] = True
|
info['do_consume'] = True
|
||||||
|
|
||||||
def _consume():
|
def _consume():
|
||||||
@@ -627,7 +609,7 @@ class Connection(object):
|
|||||||
def _error_callback(exc):
|
def _error_callback(exc):
|
||||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||||
LOG.exception(_("Failed to publish message to topic "
|
LOG.exception(_("Failed to publish message to topic "
|
||||||
"'%(topic)s': %(err_str)s") % log_info)
|
"'%(topic)s': %(err_str)s") % log_info)
|
||||||
|
|
||||||
def _publish():
|
def _publish():
|
||||||
publisher = cls(self.conf, self.channel, topic, **kwargs)
|
publisher = cls(self.conf, self.channel, topic, **kwargs)
|
||||||
@@ -691,8 +673,9 @@ class Connection(object):
|
|||||||
|
|
||||||
def create_consumer(self, topic, proxy, fanout=False):
|
def create_consumer(self, topic, proxy, fanout=False):
|
||||||
"""Create a consumer that calls a method in a proxy object"""
|
"""Create a consumer that calls a method in a proxy object"""
|
||||||
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
|
proxy_cb = rpc_amqp.ProxyCallback(
|
||||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
self.conf, proxy,
|
||||||
|
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||||
|
|
||||||
if fanout:
|
if fanout:
|
||||||
self.declare_fanout_consumer(topic, proxy_cb)
|
self.declare_fanout_consumer(topic, proxy_cb)
|
||||||
@@ -701,57 +684,66 @@ class Connection(object):
|
|||||||
|
|
||||||
def create_worker(self, topic, proxy, pool_name):
|
def create_worker(self, topic, proxy, pool_name):
|
||||||
"""Create a worker that calls a method in a proxy object"""
|
"""Create a worker that calls a method in a proxy object"""
|
||||||
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
|
proxy_cb = rpc_amqp.ProxyCallback(
|
||||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
self.conf, proxy,
|
||||||
|
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||||
self.declare_topic_consumer(topic, proxy_cb, pool_name)
|
self.declare_topic_consumer(topic, proxy_cb, pool_name)
|
||||||
|
|
||||||
|
|
||||||
def create_connection(conf, new=True):
|
def create_connection(conf, new=True):
|
||||||
"""Create a connection"""
|
"""Create a connection"""
|
||||||
return rpc_amqp.create_connection(conf, new,
|
return rpc_amqp.create_connection(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, new,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def multicall(conf, context, topic, msg, timeout=None):
|
def multicall(conf, context, topic, msg, timeout=None):
|
||||||
"""Make a call that returns multiple times."""
|
"""Make a call that returns multiple times."""
|
||||||
return rpc_amqp.multicall(conf, context, topic, msg, timeout,
|
return rpc_amqp.multicall(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, topic, msg, timeout,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def call(conf, context, topic, msg, timeout=None):
|
def call(conf, context, topic, msg, timeout=None):
|
||||||
"""Sends a message on a topic and wait for a response."""
|
"""Sends a message on a topic and wait for a response."""
|
||||||
return rpc_amqp.call(conf, context, topic, msg, timeout,
|
return rpc_amqp.call(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, topic, msg, timeout,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def cast(conf, context, topic, msg):
|
def cast(conf, context, topic, msg):
|
||||||
"""Sends a message on a topic without waiting for a response."""
|
"""Sends a message on a topic without waiting for a response."""
|
||||||
return rpc_amqp.cast(conf, context, topic, msg,
|
return rpc_amqp.cast(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, topic, msg,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def fanout_cast(conf, context, topic, msg):
|
def fanout_cast(conf, context, topic, msg):
|
||||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||||
return rpc_amqp.fanout_cast(conf, context, topic, msg,
|
return rpc_amqp.fanout_cast(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, topic, msg,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def cast_to_server(conf, context, server_params, topic, msg):
|
def cast_to_server(conf, context, server_params, topic, msg):
|
||||||
"""Sends a message on a topic to a specific server."""
|
"""Sends a message on a topic to a specific server."""
|
||||||
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
|
return rpc_amqp.cast_to_server(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, server_params, topic, msg,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def fanout_cast_to_server(conf, context, server_params, topic, msg):
|
def fanout_cast_to_server(conf, context, server_params, topic, msg):
|
||||||
"""Sends a message on a fanout exchange to a specific server."""
|
"""Sends a message on a fanout exchange to a specific server."""
|
||||||
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
|
return rpc_amqp.cast_to_server(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, server_params, topic, msg,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def notify(conf, context, topic, msg):
|
def notify(conf, context, topic, msg):
|
||||||
"""Sends a notification event on a topic."""
|
"""Sends a notification event on a topic."""
|
||||||
return rpc_amqp.notify(conf, context, topic, msg,
|
return rpc_amqp.notify(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, topic, msg,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def cleanup():
|
def cleanup():
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ qpid_opts = [
|
|||||||
cfg.BoolOpt('qpid_tcp_nodelay',
|
cfg.BoolOpt('qpid_tcp_nodelay',
|
||||||
default=True,
|
default=True,
|
||||||
help='Disable Nagle algorithm'),
|
help='Disable Nagle algorithm'),
|
||||||
]
|
]
|
||||||
|
|
||||||
cfg.CONF.register_opts(qpid_opts)
|
cfg.CONF.register_opts(qpid_opts)
|
||||||
|
|
||||||
@@ -161,10 +161,10 @@ class DirectConsumer(ConsumerBase):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
super(DirectConsumer, self).__init__(session, callback,
|
super(DirectConsumer, self).__init__(session, callback,
|
||||||
"%s/%s" % (msg_id, msg_id),
|
"%s/%s" % (msg_id, msg_id),
|
||||||
{"type": "direct"},
|
{"type": "direct"},
|
||||||
msg_id,
|
msg_id,
|
||||||
{"exclusive": True})
|
{"exclusive": True})
|
||||||
|
|
||||||
|
|
||||||
class TopicConsumer(ConsumerBase):
|
class TopicConsumer(ConsumerBase):
|
||||||
@@ -181,8 +181,9 @@ class TopicConsumer(ConsumerBase):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
super(TopicConsumer, self).__init__(session, callback,
|
super(TopicConsumer, self).__init__(session, callback,
|
||||||
"%s/%s" % (conf.control_exchange, topic), {},
|
"%s/%s" % (conf.control_exchange,
|
||||||
name or topic, {})
|
topic),
|
||||||
|
{}, name or topic, {})
|
||||||
|
|
||||||
|
|
||||||
class FanoutConsumer(ConsumerBase):
|
class FanoutConsumer(ConsumerBase):
|
||||||
@@ -196,11 +197,12 @@ class FanoutConsumer(ConsumerBase):
|
|||||||
'callback' is the callback to call when messages are received
|
'callback' is the callback to call when messages are received
|
||||||
"""
|
"""
|
||||||
|
|
||||||
super(FanoutConsumer, self).__init__(session, callback,
|
super(FanoutConsumer, self).__init__(
|
||||||
"%s_fanout" % topic,
|
session, callback,
|
||||||
{"durable": False, "type": "fanout"},
|
"%s_fanout" % topic,
|
||||||
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
|
{"durable": False, "type": "fanout"},
|
||||||
{"exclusive": True})
|
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
|
||||||
|
{"exclusive": True})
|
||||||
|
|
||||||
|
|
||||||
class Publisher(object):
|
class Publisher(object):
|
||||||
@@ -254,8 +256,9 @@ class TopicPublisher(Publisher):
|
|||||||
def __init__(self, conf, session, topic):
|
def __init__(self, conf, session, topic):
|
||||||
"""init a 'topic' publisher.
|
"""init a 'topic' publisher.
|
||||||
"""
|
"""
|
||||||
super(TopicPublisher, self).__init__(session,
|
super(TopicPublisher, self).__init__(
|
||||||
"%s/%s" % (conf.control_exchange, topic))
|
session,
|
||||||
|
"%s/%s" % (conf.control_exchange, topic))
|
||||||
|
|
||||||
|
|
||||||
class FanoutPublisher(Publisher):
|
class FanoutPublisher(Publisher):
|
||||||
@@ -263,8 +266,9 @@ class FanoutPublisher(Publisher):
|
|||||||
def __init__(self, conf, session, topic):
|
def __init__(self, conf, session, topic):
|
||||||
"""init a 'fanout' publisher.
|
"""init a 'fanout' publisher.
|
||||||
"""
|
"""
|
||||||
super(FanoutPublisher, self).__init__(session,
|
super(FanoutPublisher, self).__init__(
|
||||||
"%s_fanout" % topic, {"type": "fanout"})
|
session,
|
||||||
|
"%s_fanout" % topic, {"type": "fanout"})
|
||||||
|
|
||||||
|
|
||||||
class NotifyPublisher(Publisher):
|
class NotifyPublisher(Publisher):
|
||||||
@@ -272,9 +276,10 @@ class NotifyPublisher(Publisher):
|
|||||||
def __init__(self, conf, session, topic):
|
def __init__(self, conf, session, topic):
|
||||||
"""init a 'topic' publisher.
|
"""init a 'topic' publisher.
|
||||||
"""
|
"""
|
||||||
super(NotifyPublisher, self).__init__(session,
|
super(NotifyPublisher, self).__init__(
|
||||||
"%s/%s" % (conf.control_exchange, topic),
|
session,
|
||||||
{"durable": True})
|
"%s/%s" % (conf.control_exchange, topic),
|
||||||
|
{"durable": True})
|
||||||
|
|
||||||
|
|
||||||
class Connection(object):
|
class Connection(object):
|
||||||
@@ -292,9 +297,9 @@ class Connection(object):
|
|||||||
server_params = {}
|
server_params = {}
|
||||||
|
|
||||||
default_params = dict(hostname=self.conf.qpid_hostname,
|
default_params = dict(hostname=self.conf.qpid_hostname,
|
||||||
port=self.conf.qpid_port,
|
port=self.conf.qpid_port,
|
||||||
username=self.conf.qpid_username,
|
username=self.conf.qpid_username,
|
||||||
password=self.conf.qpid_password)
|
password=self.conf.qpid_password)
|
||||||
|
|
||||||
params = server_params
|
params = server_params
|
||||||
for key in default_params.keys():
|
for key in default_params.keys():
|
||||||
@@ -312,18 +317,18 @@ class Connection(object):
|
|||||||
self.connection.reconnect = self.conf.qpid_reconnect
|
self.connection.reconnect = self.conf.qpid_reconnect
|
||||||
if self.conf.qpid_reconnect_timeout:
|
if self.conf.qpid_reconnect_timeout:
|
||||||
self.connection.reconnect_timeout = (
|
self.connection.reconnect_timeout = (
|
||||||
self.conf.qpid_reconnect_timeout)
|
self.conf.qpid_reconnect_timeout)
|
||||||
if self.conf.qpid_reconnect_limit:
|
if self.conf.qpid_reconnect_limit:
|
||||||
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
|
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
|
||||||
if self.conf.qpid_reconnect_interval_max:
|
if self.conf.qpid_reconnect_interval_max:
|
||||||
self.connection.reconnect_interval_max = (
|
self.connection.reconnect_interval_max = (
|
||||||
self.conf.qpid_reconnect_interval_max)
|
self.conf.qpid_reconnect_interval_max)
|
||||||
if self.conf.qpid_reconnect_interval_min:
|
if self.conf.qpid_reconnect_interval_min:
|
||||||
self.connection.reconnect_interval_min = (
|
self.connection.reconnect_interval_min = (
|
||||||
self.conf.qpid_reconnect_interval_min)
|
self.conf.qpid_reconnect_interval_min)
|
||||||
if self.conf.qpid_reconnect_interval:
|
if self.conf.qpid_reconnect_interval:
|
||||||
self.connection.reconnect_interval = (
|
self.connection.reconnect_interval = (
|
||||||
self.conf.qpid_reconnect_interval)
|
self.conf.qpid_reconnect_interval)
|
||||||
self.connection.hearbeat = self.conf.qpid_heartbeat
|
self.connection.hearbeat = self.conf.qpid_heartbeat
|
||||||
self.connection.protocol = self.conf.qpid_protocol
|
self.connection.protocol = self.conf.qpid_protocol
|
||||||
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
||||||
@@ -395,7 +400,7 @@ class Connection(object):
|
|||||||
def _connect_error(exc):
|
def _connect_error(exc):
|
||||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
||||||
"%(err_str)s") % log_info)
|
"%(err_str)s") % log_info)
|
||||||
|
|
||||||
def _declare_consumer():
|
def _declare_consumer():
|
||||||
consumer = consumer_cls(self.conf, self.session, topic, callback)
|
consumer = consumer_cls(self.conf, self.session, topic, callback)
|
||||||
@@ -410,11 +415,11 @@ class Connection(object):
|
|||||||
def _error_callback(exc):
|
def _error_callback(exc):
|
||||||
if isinstance(exc, qpid.messaging.exceptions.Empty):
|
if isinstance(exc, qpid.messaging.exceptions.Empty):
|
||||||
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
||||||
str(exc))
|
str(exc))
|
||||||
raise rpc_common.Timeout()
|
raise rpc_common.Timeout()
|
||||||
else:
|
else:
|
||||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||||
str(exc))
|
str(exc))
|
||||||
|
|
||||||
def _consume():
|
def _consume():
|
||||||
nxt_receiver = self.session.next_receiver(timeout=timeout)
|
nxt_receiver = self.session.next_receiver(timeout=timeout)
|
||||||
@@ -444,7 +449,7 @@ class Connection(object):
|
|||||||
def _connect_error(exc):
|
def _connect_error(exc):
|
||||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||||
LOG.exception(_("Failed to publish message to topic "
|
LOG.exception(_("Failed to publish message to topic "
|
||||||
"'%(topic)s': %(err_str)s") % log_info)
|
"'%(topic)s': %(err_str)s") % log_info)
|
||||||
|
|
||||||
def _publisher_send():
|
def _publisher_send():
|
||||||
publisher = cls(self.conf, self.session, topic)
|
publisher = cls(self.conf, self.session, topic)
|
||||||
@@ -508,8 +513,9 @@ class Connection(object):
|
|||||||
|
|
||||||
def create_consumer(self, topic, proxy, fanout=False):
|
def create_consumer(self, topic, proxy, fanout=False):
|
||||||
"""Create a consumer that calls a method in a proxy object"""
|
"""Create a consumer that calls a method in a proxy object"""
|
||||||
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
|
proxy_cb = rpc_amqp.ProxyCallback(
|
||||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
self.conf, proxy,
|
||||||
|
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||||
|
|
||||||
if fanout:
|
if fanout:
|
||||||
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
|
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
|
||||||
@@ -522,8 +528,9 @@ class Connection(object):
|
|||||||
|
|
||||||
def create_worker(self, topic, proxy, pool_name):
|
def create_worker(self, topic, proxy, pool_name):
|
||||||
"""Create a worker that calls a method in a proxy object"""
|
"""Create a worker that calls a method in a proxy object"""
|
||||||
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
|
proxy_cb = rpc_amqp.ProxyCallback(
|
||||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
self.conf, proxy,
|
||||||
|
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||||
|
|
||||||
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
|
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
|
||||||
name=pool_name)
|
name=pool_name)
|
||||||
@@ -535,50 +542,57 @@ class Connection(object):
|
|||||||
|
|
||||||
def create_connection(conf, new=True):
|
def create_connection(conf, new=True):
|
||||||
"""Create a connection"""
|
"""Create a connection"""
|
||||||
return rpc_amqp.create_connection(conf, new,
|
return rpc_amqp.create_connection(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, new,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def multicall(conf, context, topic, msg, timeout=None):
|
def multicall(conf, context, topic, msg, timeout=None):
|
||||||
"""Make a call that returns multiple times."""
|
"""Make a call that returns multiple times."""
|
||||||
return rpc_amqp.multicall(conf, context, topic, msg, timeout,
|
return rpc_amqp.multicall(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, topic, msg, timeout,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def call(conf, context, topic, msg, timeout=None):
|
def call(conf, context, topic, msg, timeout=None):
|
||||||
"""Sends a message on a topic and wait for a response."""
|
"""Sends a message on a topic and wait for a response."""
|
||||||
return rpc_amqp.call(conf, context, topic, msg, timeout,
|
return rpc_amqp.call(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, topic, msg, timeout,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def cast(conf, context, topic, msg):
|
def cast(conf, context, topic, msg):
|
||||||
"""Sends a message on a topic without waiting for a response."""
|
"""Sends a message on a topic without waiting for a response."""
|
||||||
return rpc_amqp.cast(conf, context, topic, msg,
|
return rpc_amqp.cast(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, topic, msg,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def fanout_cast(conf, context, topic, msg):
|
def fanout_cast(conf, context, topic, msg):
|
||||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||||
return rpc_amqp.fanout_cast(conf, context, topic, msg,
|
return rpc_amqp.fanout_cast(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, topic, msg,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def cast_to_server(conf, context, server_params, topic, msg):
|
def cast_to_server(conf, context, server_params, topic, msg):
|
||||||
"""Sends a message on a topic to a specific server."""
|
"""Sends a message on a topic to a specific server."""
|
||||||
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
|
return rpc_amqp.cast_to_server(
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, server_params, topic, msg,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def fanout_cast_to_server(conf, context, server_params, topic, msg):
|
def fanout_cast_to_server(conf, context, server_params, topic, msg):
|
||||||
"""Sends a message on a fanout exchange to a specific server."""
|
"""Sends a message on a fanout exchange to a specific server."""
|
||||||
return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
|
return rpc_amqp.fanout_cast_to_server(
|
||||||
msg, rpc_amqp.get_connection_pool(conf, Connection))
|
conf, context, server_params, topic, msg,
|
||||||
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def notify(conf, context, topic, msg):
|
def notify(conf, context, topic, msg):
|
||||||
"""Sends a notification event on a topic."""
|
"""Sends a notification event on a topic."""
|
||||||
return rpc_amqp.notify(conf, context, topic, msg,
|
return rpc_amqp.notify(conf, context, topic, msg,
|
||||||
rpc_amqp.get_connection_pool(conf, Connection))
|
rpc_amqp.get_connection_pool(conf, Connection))
|
||||||
|
|
||||||
|
|
||||||
def cleanup():
|
def cleanup():
|
||||||
|
|||||||
@@ -40,25 +40,26 @@ RPCException = rpc_common.RPCException
|
|||||||
|
|
||||||
zmq_opts = [
|
zmq_opts = [
|
||||||
cfg.StrOpt('rpc_zmq_bind_address', default='*',
|
cfg.StrOpt('rpc_zmq_bind_address', default='*',
|
||||||
help='ZeroMQ bind address. Should be a wildcard (*), '
|
help='ZeroMQ bind address. Should be a wildcard (*), '
|
||||||
'an ethernet interface, or IP. '
|
'an ethernet interface, or IP. '
|
||||||
'The "host" option should point or resolve to this address.'),
|
'The "host" option should point or resolve to this '
|
||||||
|
'address.'),
|
||||||
|
|
||||||
# The module.Class to use for matchmaking.
|
# The module.Class to use for matchmaking.
|
||||||
cfg.StrOpt('rpc_zmq_matchmaker',
|
cfg.StrOpt('rpc_zmq_matchmaker',
|
||||||
default='openstack.common.rpc.matchmaker.MatchMakerLocalhost',
|
default='openstack.common.rpc.matchmaker.MatchMakerLocalhost',
|
||||||
help='MatchMaker driver'),
|
help='MatchMaker driver'),
|
||||||
|
|
||||||
# The following port is unassigned by IANA as of 2012-05-21
|
# The following port is unassigned by IANA as of 2012-05-21
|
||||||
cfg.IntOpt('rpc_zmq_port', default=9501,
|
cfg.IntOpt('rpc_zmq_port', default=9501,
|
||||||
help='ZeroMQ receiver listening port'),
|
help='ZeroMQ receiver listening port'),
|
||||||
|
|
||||||
cfg.IntOpt('rpc_zmq_contexts', default=1,
|
cfg.IntOpt('rpc_zmq_contexts', default=1,
|
||||||
help='Number of ZeroMQ contexts, defaults to 1'),
|
help='Number of ZeroMQ contexts, defaults to 1'),
|
||||||
|
|
||||||
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
|
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
|
||||||
help='Directory for holding IPC sockets'),
|
help='Directory for holding IPC sockets'),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
# These globals are defined in register_opts(conf),
|
# These globals are defined in register_opts(conf),
|
||||||
@@ -119,10 +120,10 @@ class ZmqSocket(object):
|
|||||||
self.subscribe(f)
|
self.subscribe(f)
|
||||||
|
|
||||||
LOG.debug(_("Connecting to %{addr}s with %{type}s"
|
LOG.debug(_("Connecting to %{addr}s with %{type}s"
|
||||||
"\n-> Subscribed to %{subscribe}s"
|
"\n-> Subscribed to %{subscribe}s"
|
||||||
"\n-> bind: %{bind}s"),
|
"\n-> bind: %{bind}s"),
|
||||||
{'addr': addr, 'type': self.socket_s(),
|
{'addr': addr, 'type': self.socket_s(),
|
||||||
'subscribe': subscribe, 'bind': bind})
|
'subscribe': subscribe, 'bind': bind})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if bind:
|
if bind:
|
||||||
@@ -197,7 +198,7 @@ class ZmqClient(object):
|
|||||||
|
|
||||||
def cast(self, msg_id, topic, data):
|
def cast(self, msg_id, topic, data):
|
||||||
self.outq.send([str(msg_id), str(topic), str('cast'),
|
self.outq.send([str(msg_id), str(topic), str('cast'),
|
||||||
_serialize(data)])
|
_serialize(data)])
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.outq.close()
|
self.outq.close()
|
||||||
@@ -306,7 +307,7 @@ class ConsumerBase(object):
|
|||||||
data.setdefault('version', None)
|
data.setdefault('version', None)
|
||||||
data.setdefault('args', [])
|
data.setdefault('args', [])
|
||||||
proxy.dispatch(ctx, data['version'],
|
proxy.dispatch(ctx, data['version'],
|
||||||
data['method'], **data['args'])
|
data['method'], **data['args'])
|
||||||
|
|
||||||
|
|
||||||
class ZmqBaseReactor(ConsumerBase):
|
class ZmqBaseReactor(ConsumerBase):
|
||||||
@@ -339,7 +340,7 @@ class ZmqBaseReactor(ConsumerBase):
|
|||||||
|
|
||||||
# Items push in.
|
# Items push in.
|
||||||
inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
|
inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
|
||||||
subscribe=subscribe)
|
subscribe=subscribe)
|
||||||
|
|
||||||
self.proxies[inq] = proxy
|
self.proxies[inq] = proxy
|
||||||
self.sockets.append(inq)
|
self.sockets.append(inq)
|
||||||
@@ -353,8 +354,7 @@ class ZmqBaseReactor(ConsumerBase):
|
|||||||
raise RPCException("Bad output socktype")
|
raise RPCException("Bad output socktype")
|
||||||
|
|
||||||
# Items push out.
|
# Items push out.
|
||||||
outq = ZmqSocket(out_addr, zmq_type_out,
|
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
|
||||||
bind=out_bind)
|
|
||||||
|
|
||||||
self.mapping[inq] = outq
|
self.mapping[inq] = outq
|
||||||
self.mapping[outq] = inq
|
self.mapping[outq] = inq
|
||||||
@@ -428,7 +428,7 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
|
|
||||||
if not topic in self.topic_proxy:
|
if not topic in self.topic_proxy:
|
||||||
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
|
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
|
||||||
sock_type, bind=True)
|
sock_type, bind=True)
|
||||||
self.topic_proxy[topic] = outq
|
self.topic_proxy[topic] = outq
|
||||||
self.sockets.append(outq)
|
self.sockets.append(outq)
|
||||||
LOG.info(_("Created topic proxy: %s"), topic)
|
LOG.info(_("Created topic proxy: %s"), topic)
|
||||||
@@ -486,7 +486,7 @@ class Connection(rpc_common.Connection):
|
|||||||
topic = topic.split('.', 1)[0]
|
topic = topic.split('.', 1)[0]
|
||||||
|
|
||||||
LOG.info(_("Create Consumer for topic (%(topic)s)") %
|
LOG.info(_("Create Consumer for topic (%(topic)s)") %
|
||||||
{'topic': topic})
|
{'topic': topic})
|
||||||
|
|
||||||
# Subscription scenarios
|
# Subscription scenarios
|
||||||
if fanout:
|
if fanout:
|
||||||
@@ -502,7 +502,7 @@ class Connection(rpc_common.Connection):
|
|||||||
(self.conf.rpc_zmq_ipc_dir, topic)
|
(self.conf.rpc_zmq_ipc_dir, topic)
|
||||||
|
|
||||||
LOG.debug(_("Consumer is a zmq.%s"),
|
LOG.debug(_("Consumer is a zmq.%s"),
|
||||||
['PULL', 'SUB'][sock_type == zmq.SUB])
|
['PULL', 'SUB'][sock_type == zmq.SUB])
|
||||||
|
|
||||||
self.reactor.register(proxy, inaddr, sock_type,
|
self.reactor.register(proxy, inaddr, sock_type,
|
||||||
subscribe=subscribe, in_bind=False)
|
subscribe=subscribe, in_bind=False)
|
||||||
|
|||||||
@@ -29,8 +29,8 @@ from openstack.common import cfg
|
|||||||
matchmaker_opts = [
|
matchmaker_opts = [
|
||||||
# Matchmaker ring file
|
# Matchmaker ring file
|
||||||
cfg.StrOpt('matchmaker_ringfile',
|
cfg.StrOpt('matchmaker_ringfile',
|
||||||
default='/etc/nova/matchmaker_ring.json',
|
default='/etc/nova/matchmaker_ring.json',
|
||||||
help='Matchmaker ring file (JSON)'),
|
help='Matchmaker ring file (JSON)'),
|
||||||
]
|
]
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|||||||
@@ -127,7 +127,7 @@ class RpcProxy(object):
|
|||||||
rpc.fanout_cast(context, self.topic, msg)
|
rpc.fanout_cast(context, self.topic, msg)
|
||||||
|
|
||||||
def cast_to_server(self, context, server_params, msg, topic=None,
|
def cast_to_server(self, context, server_params, msg, topic=None,
|
||||||
version=None):
|
version=None):
|
||||||
"""rpc.cast_to_server() a remote method.
|
"""rpc.cast_to_server() a remote method.
|
||||||
|
|
||||||
:param context: The request context
|
:param context: The request context
|
||||||
|
|||||||
@@ -118,13 +118,13 @@ def execute(*cmd, **kwargs):
|
|||||||
LOG.debug(_('Result was %s') % _returncode)
|
LOG.debug(_('Result was %s') % _returncode)
|
||||||
if (isinstance(check_exit_code, int) and
|
if (isinstance(check_exit_code, int) and
|
||||||
not isinstance(check_exit_code, bool) and
|
not isinstance(check_exit_code, bool) and
|
||||||
_returncode != check_exit_code):
|
_returncode != check_exit_code):
|
||||||
(stdout, stderr) = result
|
(stdout, stderr) = result
|
||||||
raise exception.ProcessExecutionError(
|
raise exception.ProcessExecutionError(
|
||||||
exit_code=_returncode,
|
exit_code=_returncode,
|
||||||
stdout=stdout,
|
stdout=stdout,
|
||||||
stderr=stderr,
|
stderr=stderr,
|
||||||
cmd=' '.join(cmd))
|
cmd=' '.join(cmd))
|
||||||
return result
|
return result
|
||||||
except exception.ProcessExecutionError:
|
except exception.ProcessExecutionError:
|
||||||
if not attempts:
|
if not attempts:
|
||||||
|
|||||||
3
setup.py
3
setup.py
@@ -20,8 +20,7 @@ setup(name='openstack.common',
|
|||||||
'License :: OSI Approved :: Apache Software License',
|
'License :: OSI Approved :: Apache Software License',
|
||||||
'Operating System :: POSIX :: Linux',
|
'Operating System :: POSIX :: Linux',
|
||||||
'Programming Language :: Python :: 2.6',
|
'Programming Language :: Python :: 2.6',
|
||||||
'Environment :: No Input/Output (Daemon)',
|
'Environment :: No Input/Output (Daemon)', ],
|
||||||
],
|
|
||||||
keywords='openstack',
|
keywords='openstack',
|
||||||
author='OpenStack',
|
author='OpenStack',
|
||||||
author_email='openstack@lists.launchpad.net',
|
author_email='openstack@lists.launchpad.net',
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ class Foxinsocks(object):
|
|||||||
def get_resources(self):
|
def get_resources(self):
|
||||||
resources = []
|
resources = []
|
||||||
resource = extensions.ResourceExtension('foxnsocks',
|
resource = extensions.ResourceExtension('foxnsocks',
|
||||||
FoxInSocksController())
|
FoxInSocksController())
|
||||||
resources.append(resource)
|
resources.append(resource)
|
||||||
return resources
|
return resources
|
||||||
|
|
||||||
@@ -73,7 +73,7 @@ class Foxinsocks(object):
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
req_ext1 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
|
req_ext1 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
|
||||||
_goose_handler)
|
_goose_handler)
|
||||||
request_exts.append(req_ext1)
|
request_exts.append(req_ext1)
|
||||||
|
|
||||||
def _bands_handler(req, res):
|
def _bands_handler(req, res):
|
||||||
@@ -85,7 +85,7 @@ class Foxinsocks(object):
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
req_ext2 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
|
req_ext2 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
|
||||||
_bands_handler)
|
_bands_handler)
|
||||||
request_exts.append(req_ext2)
|
request_exts.append(req_ext2)
|
||||||
return request_exts
|
return request_exts
|
||||||
|
|
||||||
|
|||||||
@@ -80,8 +80,8 @@ class BaseRpcTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
value = 42
|
value = 42
|
||||||
result = self.rpc.call(FLAGS, self.context, self.topic,
|
result = self.rpc.call(FLAGS, self.context, self.topic,
|
||||||
{"method": "echo_three_times_yield",
|
{"method": "echo_three_times_yield",
|
||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
self.assertEqual(value + 2, result)
|
self.assertEqual(value + 2, result)
|
||||||
|
|
||||||
def test_multicall_succeed_once(self):
|
def test_multicall_succeed_once(self):
|
||||||
@@ -90,9 +90,9 @@ class BaseRpcTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
value = 42
|
value = 42
|
||||||
result = self.rpc.multicall(FLAGS, self.context,
|
result = self.rpc.multicall(FLAGS, self.context,
|
||||||
self.topic,
|
self.topic,
|
||||||
{"method": "echo",
|
{"method": "echo",
|
||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
for i, x in enumerate(result):
|
for i, x in enumerate(result):
|
||||||
if i > 0:
|
if i > 0:
|
||||||
self.fail('should only receive one response')
|
self.fail('should only receive one response')
|
||||||
@@ -104,9 +104,9 @@ class BaseRpcTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
value = 42
|
value = 42
|
||||||
result = self.rpc.multicall(FLAGS, self.context,
|
result = self.rpc.multicall(FLAGS, self.context,
|
||||||
self.topic,
|
self.topic,
|
||||||
{"method": "multicall_three_nones",
|
{"method": "multicall_three_nones",
|
||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
for i, x in enumerate(result):
|
for i, x in enumerate(result):
|
||||||
self.assertEqual(x, None)
|
self.assertEqual(x, None)
|
||||||
# i should have been 0, 1, and finally 2:
|
# i should have been 0, 1, and finally 2:
|
||||||
@@ -118,9 +118,9 @@ class BaseRpcTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
value = 42
|
value = 42
|
||||||
result = self.rpc.multicall(FLAGS, self.context,
|
result = self.rpc.multicall(FLAGS, self.context,
|
||||||
self.topic,
|
self.topic,
|
||||||
{"method": "echo_three_times_yield",
|
{"method": "echo_three_times_yield",
|
||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
for i, x in enumerate(result):
|
for i, x in enumerate(result):
|
||||||
self.assertEqual(value + i, x)
|
self.assertEqual(value + i, x)
|
||||||
|
|
||||||
@@ -131,8 +131,8 @@ class BaseRpcTestCase(unittest.TestCase):
|
|||||||
"""Makes sure a context is passed through rpc call."""
|
"""Makes sure a context is passed through rpc call."""
|
||||||
value = 42
|
value = 42
|
||||||
result = self.rpc.call(FLAGS, self.context,
|
result = self.rpc.call(FLAGS, self.context,
|
||||||
self.topic, {"method": "context",
|
self.topic, {"method": "context",
|
||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
self.assertEqual(self.context.to_dict(), result)
|
self.assertEqual(self.context.to_dict(), result)
|
||||||
|
|
||||||
def _test_cast(self, fanout=False):
|
def _test_cast(self, fanout=False):
|
||||||
@@ -189,14 +189,14 @@ class BaseRpcTestCase(unittest.TestCase):
|
|||||||
def echo(context, queue, value):
|
def echo(context, queue, value):
|
||||||
"""Calls echo in the passed queue."""
|
"""Calls echo in the passed queue."""
|
||||||
LOG.debug(_("Nested received %(queue)s, %(value)s")
|
LOG.debug(_("Nested received %(queue)s, %(value)s")
|
||||||
% locals())
|
% locals())
|
||||||
# TODO(comstud):
|
# TODO(comstud):
|
||||||
# so, it will replay the context and use the same REQID?
|
# so, it will replay the context and use the same REQID?
|
||||||
# that's bizarre.
|
# that's bizarre.
|
||||||
ret = self.rpc.call(FLAGS, context,
|
ret = self.rpc.call(FLAGS, context,
|
||||||
queue,
|
queue,
|
||||||
{"method": "echo",
|
{"method": "echo",
|
||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
LOG.debug(_("Nested return %s"), ret)
|
LOG.debug(_("Nested return %s"), ret)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
@@ -228,10 +228,10 @@ class BaseRpcTestCase(unittest.TestCase):
|
|||||||
"args": {"value": value}}, timeout=1)
|
"args": {"value": value}}, timeout=1)
|
||||||
try:
|
try:
|
||||||
self.rpc.call(FLAGS, self.context,
|
self.rpc.call(FLAGS, self.context,
|
||||||
self.topic,
|
self.topic,
|
||||||
{"method": "block",
|
{"method": "block",
|
||||||
"args": {"value": value}},
|
"args": {"value": value}},
|
||||||
timeout=1)
|
timeout=1)
|
||||||
self.fail("should have thrown Timeout")
|
self.fail("should have thrown Timeout")
|
||||||
except rpc_common.Timeout as exc:
|
except rpc_common.Timeout as exc:
|
||||||
pass
|
pass
|
||||||
@@ -272,8 +272,8 @@ class BaseRpcAMQPTestCase(BaseRpcTestCase):
|
|||||||
|
|
||||||
value = 42
|
value = 42
|
||||||
result = self.rpc.call(FLAGS, self.context, self.topic,
|
result = self.rpc.call(FLAGS, self.context, self.topic,
|
||||||
{"method": "echo",
|
{"method": "echo",
|
||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
self.assertEqual(value, result)
|
self.assertEqual(value, result)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -87,18 +87,22 @@ class RpcDispatcherTestCase(unittest.TestCase):
|
|||||||
self._test_dispatch('3.1', (None, None, self.ctxt, 1))
|
self._test_dispatch('3.1', (None, None, self.ctxt, 1))
|
||||||
|
|
||||||
def test_dispatch_higher_minor_version(self):
|
def test_dispatch_higher_minor_version(self):
|
||||||
self.assertRaises(rpc_common.UnsupportedRpcVersion,
|
self.assertRaises(
|
||||||
self._test_dispatch, '2.6', (None, None, None, None))
|
rpc_common.UnsupportedRpcVersion,
|
||||||
self.assertRaises(rpc_common.UnsupportedRpcVersion,
|
self._test_dispatch, '2.6', (None, None, None, None))
|
||||||
self._test_dispatch, '3.6', (None, None, None, None))
|
self.assertRaises(
|
||||||
|
rpc_common.UnsupportedRpcVersion,
|
||||||
|
self._test_dispatch, '3.6', (None, None, None, None))
|
||||||
|
|
||||||
def test_dispatch_lower_major_version(self):
|
def test_dispatch_lower_major_version(self):
|
||||||
self.assertRaises(rpc_common.UnsupportedRpcVersion,
|
self.assertRaises(
|
||||||
self._test_dispatch, '1.0', (None, None, None, None))
|
rpc_common.UnsupportedRpcVersion,
|
||||||
|
self._test_dispatch, '1.0', (None, None, None, None))
|
||||||
|
|
||||||
def test_dispatch_higher_major_version(self):
|
def test_dispatch_higher_major_version(self):
|
||||||
self.assertRaises(rpc_common.UnsupportedRpcVersion,
|
self.assertRaises(
|
||||||
self._test_dispatch, '4.0', (None, None, None, None))
|
rpc_common.UnsupportedRpcVersion,
|
||||||
|
self._test_dispatch, '4.0', (None, None, None, None))
|
||||||
|
|
||||||
def test_dispatch_no_version_uses_v1(self):
|
def test_dispatch_no_version_uses_v1(self):
|
||||||
v1 = self.API1()
|
v1 = self.API1()
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ class MyException(Exception):
|
|||||||
|
|
||||||
|
|
||||||
def _raise_exc_stub(stubs, times, obj, method, exc_msg,
|
def _raise_exc_stub(stubs, times, obj, method, exc_msg,
|
||||||
exc_class=MyException):
|
exc_class=MyException):
|
||||||
info = {'called': 0}
|
info = {'called': 0}
|
||||||
orig_method = getattr(obj, method)
|
orig_method = getattr(obj, method)
|
||||||
|
|
||||||
@@ -166,13 +166,14 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
class MyConnection(impl_kombu.Connection):
|
class MyConnection(impl_kombu.Connection):
|
||||||
def __init__(myself, *args, **kwargs):
|
def __init__(myself, *args, **kwargs):
|
||||||
super(MyConnection, myself).__init__(*args, **kwargs)
|
super(MyConnection, myself).__init__(*args, **kwargs)
|
||||||
self.assertEqual(myself.params,
|
self.assertEqual(
|
||||||
{'hostname': FLAGS.rabbit_host,
|
myself.params,
|
||||||
'userid': FLAGS.rabbit_userid,
|
{'hostname': FLAGS.rabbit_host,
|
||||||
'password': FLAGS.rabbit_password,
|
'userid': FLAGS.rabbit_userid,
|
||||||
'port': FLAGS.rabbit_port,
|
'password': FLAGS.rabbit_password,
|
||||||
'virtual_host': FLAGS.rabbit_virtual_host,
|
'port': FLAGS.rabbit_port,
|
||||||
'transport': 'memory'})
|
'virtual_host': FLAGS.rabbit_virtual_host,
|
||||||
|
'transport': 'memory'})
|
||||||
|
|
||||||
def topic_send(_context, topic, msg):
|
def topic_send(_context, topic, msg):
|
||||||
pass
|
pass
|
||||||
@@ -198,13 +199,14 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
class MyConnection(impl_kombu.Connection):
|
class MyConnection(impl_kombu.Connection):
|
||||||
def __init__(myself, *args, **kwargs):
|
def __init__(myself, *args, **kwargs):
|
||||||
super(MyConnection, myself).__init__(*args, **kwargs)
|
super(MyConnection, myself).__init__(*args, **kwargs)
|
||||||
self.assertEqual(myself.params,
|
self.assertEqual(
|
||||||
{'hostname': server_params['hostname'],
|
myself.params,
|
||||||
'userid': server_params['username'],
|
{'hostname': server_params['hostname'],
|
||||||
'password': server_params['password'],
|
'userid': server_params['username'],
|
||||||
'port': server_params['port'],
|
'password': server_params['password'],
|
||||||
'virtual_host': server_params['virtual_host'],
|
'port': server_params['port'],
|
||||||
'transport': 'memory'})
|
'virtual_host': server_params['virtual_host'],
|
||||||
|
'transport': 'memory'})
|
||||||
|
|
||||||
def topic_send(_context, topic, msg):
|
def topic_send(_context, topic, msg):
|
||||||
pass
|
pass
|
||||||
@@ -213,7 +215,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
|
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
|
||||||
|
|
||||||
impl_kombu.cast_to_server(FLAGS, ctxt, server_params,
|
impl_kombu.cast_to_server(FLAGS, ctxt, server_params,
|
||||||
'fake_topic', {'msg': 'fake'})
|
'fake_topic', {'msg': 'fake'})
|
||||||
|
|
||||||
@testutils.skip_test("kombu memory transport seems buggy with "
|
@testutils.skip_test("kombu memory transport seems buggy with "
|
||||||
"fanout queues as this test passes when "
|
"fanout queues as this test passes when "
|
||||||
@@ -248,11 +250,11 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
# Test that any exception with 'timeout' in it causes a
|
# Test that any exception with 'timeout' in it causes a
|
||||||
# reconnection
|
# reconnection
|
||||||
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
|
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
|
||||||
'__init__', 'foo timeout foo')
|
'__init__', 'foo timeout foo')
|
||||||
|
|
||||||
conn = self.rpc.Connection(FLAGS)
|
conn = self.rpc.Connection(FLAGS)
|
||||||
result = conn.declare_consumer(self.rpc.DirectConsumer,
|
result = conn.declare_consumer(self.rpc.DirectConsumer,
|
||||||
'test_topic', None)
|
'test_topic', None)
|
||||||
|
|
||||||
self.assertEqual(info['called'], 3)
|
self.assertEqual(info['called'], 3)
|
||||||
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
||||||
@@ -262,13 +264,13 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
self.stubs.UnsetAll()
|
self.stubs.UnsetAll()
|
||||||
|
|
||||||
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
|
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
|
||||||
'__init__', 'meow')
|
'__init__', 'meow')
|
||||||
|
|
||||||
conn = self.rpc.Connection(FLAGS)
|
conn = self.rpc.Connection(FLAGS)
|
||||||
conn.connection_errors = (MyException, )
|
conn.connection_errors = (MyException, )
|
||||||
|
|
||||||
result = conn.declare_consumer(self.rpc.DirectConsumer,
|
result = conn.declare_consumer(self.rpc.DirectConsumer,
|
||||||
'test_topic', None)
|
'test_topic', None)
|
||||||
|
|
||||||
self.assertEqual(info['called'], 2)
|
self.assertEqual(info['called'], 2)
|
||||||
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
||||||
@@ -277,11 +279,11 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
def test_declare_consumer_ioerrors_will_reconnect(self):
|
def test_declare_consumer_ioerrors_will_reconnect(self):
|
||||||
"""Test that an IOError exception causes a reconnection"""
|
"""Test that an IOError exception causes a reconnection"""
|
||||||
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
|
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
|
||||||
'__init__', 'Socket closed', exc_class=IOError)
|
'__init__', 'Socket closed', exc_class=IOError)
|
||||||
|
|
||||||
conn = self.rpc.Connection(FLAGS)
|
conn = self.rpc.Connection(FLAGS)
|
||||||
result = conn.declare_consumer(self.rpc.DirectConsumer,
|
result = conn.declare_consumer(self.rpc.DirectConsumer,
|
||||||
'test_topic', None)
|
'test_topic', None)
|
||||||
|
|
||||||
self.assertEqual(info['called'], 3)
|
self.assertEqual(info['called'], 3)
|
||||||
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
|
||||||
@@ -292,7 +294,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
# reconnection when declaring the publisher class and when
|
# reconnection when declaring the publisher class and when
|
||||||
# calling send()
|
# calling send()
|
||||||
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
|
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
|
||||||
'__init__', 'foo timeout foo')
|
'__init__', 'foo timeout foo')
|
||||||
|
|
||||||
conn = self.rpc.Connection(FLAGS)
|
conn = self.rpc.Connection(FLAGS)
|
||||||
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
|
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
|
||||||
@@ -301,7 +303,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
self.stubs.UnsetAll()
|
self.stubs.UnsetAll()
|
||||||
|
|
||||||
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
|
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
|
||||||
'send', 'foo timeout foo')
|
'send', 'foo timeout foo')
|
||||||
|
|
||||||
conn = self.rpc.Connection(FLAGS)
|
conn = self.rpc.Connection(FLAGS)
|
||||||
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
|
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
|
||||||
@@ -314,7 +316,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
self.stubs.UnsetAll()
|
self.stubs.UnsetAll()
|
||||||
|
|
||||||
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
|
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
|
||||||
'__init__', 'meow')
|
'__init__', 'meow')
|
||||||
|
|
||||||
conn = self.rpc.Connection(FLAGS)
|
conn = self.rpc.Connection(FLAGS)
|
||||||
conn.connection_errors = (MyException, )
|
conn.connection_errors = (MyException, )
|
||||||
@@ -325,7 +327,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
self.stubs.UnsetAll()
|
self.stubs.UnsetAll()
|
||||||
|
|
||||||
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
|
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
|
||||||
'send', 'meow')
|
'send', 'meow')
|
||||||
|
|
||||||
conn = self.rpc.Connection(FLAGS)
|
conn = self.rpc.Connection(FLAGS)
|
||||||
conn.connection_errors = (MyException, )
|
conn.connection_errors = (MyException, )
|
||||||
@@ -348,7 +350,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
conn.direct_send('a_direct', message)
|
conn.direct_send('a_direct', message)
|
||||||
|
|
||||||
info = _raise_exc_stub(self.stubs, 1, conn.connection,
|
info = _raise_exc_stub(self.stubs, 1, conn.connection,
|
||||||
'drain_events', 'foo timeout foo')
|
'drain_events', 'foo timeout foo')
|
||||||
conn.consume(limit=1)
|
conn.consume(limit=1)
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@@ -374,9 +376,9 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
try:
|
try:
|
||||||
self.rpc.call(FLAGS, self.context,
|
self.rpc.call(FLAGS, self.context,
|
||||||
'test',
|
'test',
|
||||||
{"method": "fail",
|
{"method": "fail",
|
||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
self.fail("should have thrown Exception")
|
self.fail("should have thrown Exception")
|
||||||
except NotImplementedError as exc:
|
except NotImplementedError as exc:
|
||||||
self.assertTrue(value in unicode(exc))
|
self.assertTrue(value in unicode(exc))
|
||||||
@@ -404,9 +406,9 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
|
|||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
try:
|
try:
|
||||||
self.rpc.call(FLAGS, self.context,
|
self.rpc.call(FLAGS, self.context,
|
||||||
'test',
|
'test',
|
||||||
{"method": "fail_converted",
|
{"method": "fail_converted",
|
||||||
"args": {"value": value}})
|
"args": {"value": value}})
|
||||||
self.fail("should have thrown Exception")
|
self.fail("should have thrown Exception")
|
||||||
except exception.ApiError as exc:
|
except exception.ApiError as exc:
|
||||||
self.assertTrue(value in unicode(exc))
|
self.assertTrue(value in unicode(exc))
|
||||||
|
|||||||
@@ -39,14 +39,14 @@ class RpcProxyTestCase(unittest.TestCase):
|
|||||||
super(RpcProxyTestCase, self).tearDown()
|
super(RpcProxyTestCase, self).tearDown()
|
||||||
|
|
||||||
def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
|
def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
|
||||||
server_params=None, supports_topic_override=True):
|
server_params=None, supports_topic_override=True):
|
||||||
topic = 'fake_topic'
|
topic = 'fake_topic'
|
||||||
timeout = 123
|
timeout = 123
|
||||||
rpc_proxy = proxy.RpcProxy(topic, '1.0')
|
rpc_proxy = proxy.RpcProxy(topic, '1.0')
|
||||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||||
msg = {'method': 'fake_method', 'args': {'x': 'y'}}
|
msg = {'method': 'fake_method', 'args': {'x': 'y'}}
|
||||||
expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
|
expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
|
||||||
'version': '1.0'}
|
'version': '1.0'}
|
||||||
|
|
||||||
expected_retval = 'hi' if has_retval else None
|
expected_retval = 'hi' if has_retval else None
|
||||||
|
|
||||||
@@ -120,8 +120,9 @@ class RpcProxyTestCase(unittest.TestCase):
|
|||||||
self._test_rpc_method('cast_to_server', server_params={'blah': 1})
|
self._test_rpc_method('cast_to_server', server_params={'blah': 1})
|
||||||
|
|
||||||
def test_fanout_cast_to_server(self):
|
def test_fanout_cast_to_server(self):
|
||||||
self._test_rpc_method('fanout_cast_to_server',
|
self._test_rpc_method(
|
||||||
server_params={'blah': 1}, supports_topic_override=False)
|
'fanout_cast_to_server',
|
||||||
|
server_params={'blah': 1}, supports_topic_override=False)
|
||||||
|
|
||||||
def test_make_msg(self):
|
def test_make_msg(self):
|
||||||
self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
|
self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
|
||||||
|
|||||||
@@ -124,20 +124,22 @@ class RpcQpidTestCase(unittest.TestCase):
|
|||||||
self.mock_connection.session().AndReturn(self.mock_session)
|
self.mock_connection.session().AndReturn(self.mock_session)
|
||||||
if fanout:
|
if fanout:
|
||||||
# The link name includes a UUID, so match it with a regex.
|
# The link name includes a UUID, so match it with a regex.
|
||||||
expected_address = mox.Regex(r'^impl_qpid_test_fanout ; '
|
expected_address = mox.Regex(
|
||||||
|
r'^impl_qpid_test_fanout ; '
|
||||||
'{"node": {"x-declare": {"auto-delete": true, "durable": '
|
'{"node": {"x-declare": {"auto-delete": true, "durable": '
|
||||||
'false, "type": "fanout"}, "type": "topic"}, "create": '
|
'false, "type": "fanout"}, "type": "topic"}, "create": '
|
||||||
'"always", "link": {"x-declare": {"auto-delete": true, '
|
'"always", "link": {"x-declare": {"auto-delete": true, '
|
||||||
'"exclusive": true, "durable": false}, "durable": true, '
|
'"exclusive": true, "durable": false}, "durable": true, '
|
||||||
'"name": "impl_qpid_test_fanout_.*"}}$')
|
'"name": "impl_qpid_test_fanout_.*"}}$')
|
||||||
else:
|
else:
|
||||||
expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
|
expected_address = (
|
||||||
|
'nova/impl_qpid_test ; {"node": {"x-declare": '
|
||||||
'{"auto-delete": true, "durable": true}, "type": "topic"}, '
|
'{"auto-delete": true, "durable": true}, "type": "topic"}, '
|
||||||
'"create": "always", "link": {"x-declare": {"auto-delete": '
|
'"create": "always", "link": {"x-declare": {"auto-delete": '
|
||||||
'true, "exclusive": false, "durable": false}, "durable": '
|
'true, "exclusive": false, "durable": false}, "durable": '
|
||||||
'true, "name": "impl_qpid_test"}}')
|
'true, "name": "impl_qpid_test"}}')
|
||||||
self.mock_session.receiver(expected_address).AndReturn(
|
self.mock_session.receiver(expected_address).AndReturn(
|
||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
self.mock_receiver.capacity = 1
|
self.mock_receiver.capacity = 1
|
||||||
self.mock_connection.close()
|
self.mock_connection.close()
|
||||||
|
|
||||||
@@ -173,7 +175,7 @@ class RpcQpidTestCase(unittest.TestCase):
|
|||||||
'true, "exclusive": false, "durable": false}, "durable": '
|
'true, "exclusive": false, "durable": false}, "durable": '
|
||||||
'true, "name": "impl.qpid.test.workers"}}')
|
'true, "name": "impl.qpid.test.workers"}}')
|
||||||
self.mock_session.receiver(expected_address).AndReturn(
|
self.mock_session.receiver(expected_address).AndReturn(
|
||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
self.mock_receiver.capacity = 1
|
self.mock_receiver.capacity = 1
|
||||||
self.mock_connection.close()
|
self.mock_connection.close()
|
||||||
|
|
||||||
@@ -196,12 +198,14 @@ class RpcQpidTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
self.mock_connection.session().AndReturn(self.mock_session)
|
self.mock_connection.session().AndReturn(self.mock_session)
|
||||||
if fanout:
|
if fanout:
|
||||||
expected_address = ('impl_qpid_test_fanout ; '
|
expected_address = (
|
||||||
|
'impl_qpid_test_fanout ; '
|
||||||
'{"node": {"x-declare": {"auto-delete": true, '
|
'{"node": {"x-declare": {"auto-delete": true, '
|
||||||
'"durable": false, "type": "fanout"}, '
|
'"durable": false, "type": "fanout"}, '
|
||||||
'"type": "topic"}, "create": "always"}')
|
'"type": "topic"}, "create": "always"}')
|
||||||
else:
|
else:
|
||||||
expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
|
expected_address = (
|
||||||
|
'nova/impl_qpid_test ; {"node": {"x-declare": '
|
||||||
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
|
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
|
||||||
'"create": "always"}')
|
'"create": "always"}')
|
||||||
self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
|
self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
|
||||||
@@ -253,12 +257,12 @@ class RpcQpidTestCase(unittest.TestCase):
|
|||||||
def __init__(myself, *args, **kwargs):
|
def __init__(myself, *args, **kwargs):
|
||||||
super(MyConnection, myself).__init__(*args, **kwargs)
|
super(MyConnection, myself).__init__(*args, **kwargs)
|
||||||
self.assertEqual(myself.connection.username,
|
self.assertEqual(myself.connection.username,
|
||||||
server_params['username'])
|
server_params['username'])
|
||||||
self.assertEqual(myself.connection.password,
|
self.assertEqual(myself.connection.password,
|
||||||
server_params['password'])
|
server_params['password'])
|
||||||
self.assertEqual(myself.broker,
|
self.assertEqual(myself.broker,
|
||||||
server_params['hostname'] + ':' +
|
server_params['hostname'] + ':' +
|
||||||
str(server_params['port']))
|
str(server_params['port']))
|
||||||
|
|
||||||
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
|
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
|
||||||
self.stubs.Set(impl_qpid, 'Connection', MyConnection)
|
self.stubs.Set(impl_qpid, 'Connection', MyConnection)
|
||||||
@@ -290,43 +294,43 @@ class RpcQpidTestCase(unittest.TestCase):
|
|||||||
self.mock_connection.opened().AndReturn(False)
|
self.mock_connection.opened().AndReturn(False)
|
||||||
self.mock_connection.open()
|
self.mock_connection.open()
|
||||||
self.mock_connection.session().AndReturn(self.mock_session)
|
self.mock_connection.session().AndReturn(self.mock_session)
|
||||||
rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
|
rcv_addr = mox.Regex(
|
||||||
' true, "durable": true, "type": "direct"}, "type": '
|
r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
|
||||||
'"topic"}, "create": "always", "link": {"x-declare": '
|
' true, "durable": true, "type": "direct"}, "type": '
|
||||||
'{"auto-delete": true, "exclusive": true, "durable": '
|
'"topic"}, "create": "always", "link": {"x-declare": '
|
||||||
'false}, "durable": true, "name": ".*"}}')
|
'{"auto-delete": true, "exclusive": true, "durable": '
|
||||||
|
'false}, "durable": true, "name": ".*"}}')
|
||||||
self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
|
self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
|
||||||
self.mock_receiver.capacity = 1
|
self.mock_receiver.capacity = 1
|
||||||
send_addr = ('nova/impl_qpid_test ; {"node": {"x-declare": '
|
send_addr = (
|
||||||
|
'nova/impl_qpid_test ; {"node": {"x-declare": '
|
||||||
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
|
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
|
||||||
'"create": "always"}')
|
'"create": "always"}')
|
||||||
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
|
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
|
||||||
self.mock_sender.send(mox.IgnoreArg())
|
self.mock_sender.send(mox.IgnoreArg())
|
||||||
|
|
||||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
||||||
{"result": "foo", "failure": False, "ending": False}))
|
{"result": "foo", "failure": False, "ending": False}))
|
||||||
self.mock_session.acknowledge(mox.IgnoreArg())
|
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||||
if multi:
|
if multi:
|
||||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
self.mock_receiver.fetch().AndReturn(
|
self.mock_receiver.fetch().AndReturn(
|
||||||
qpid.messaging.Message(
|
qpid.messaging.Message({"result": "bar", "failure": False,
|
||||||
{"result": "bar", "failure": False,
|
"ending": False}))
|
||||||
"ending": False}))
|
|
||||||
self.mock_session.acknowledge(mox.IgnoreArg())
|
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
self.mock_receiver.fetch().AndReturn(
|
self.mock_receiver.fetch().AndReturn(
|
||||||
qpid.messaging.Message(
|
qpid.messaging.Message({"result": "baz", "failure": False,
|
||||||
{"result": "baz", "failure": False,
|
"ending": False}))
|
||||||
"ending": False}))
|
|
||||||
self.mock_session.acknowledge(mox.IgnoreArg())
|
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
||||||
{"failure": False, "ending": True}))
|
{"failure": False, "ending": True}))
|
||||||
self.mock_session.acknowledge(mox.IgnoreArg())
|
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||||
self.mock_session.close()
|
self.mock_session.close()
|
||||||
self.mock_connection.session().AndReturn(self.mock_session)
|
self.mock_connection.session().AndReturn(self.mock_session)
|
||||||
@@ -342,7 +346,7 @@ class RpcQpidTestCase(unittest.TestCase):
|
|||||||
method = impl_qpid.call
|
method = impl_qpid.call
|
||||||
|
|
||||||
res = method(FLAGS, ctx, "impl_qpid_test",
|
res = method(FLAGS, ctx, "impl_qpid_test",
|
||||||
{"method": "test_method", "args": {}})
|
{"method": "test_method", "args": {}})
|
||||||
|
|
||||||
if multi:
|
if multi:
|
||||||
self.assertEquals(list(res), ["foo", "bar", "baz"])
|
self.assertEquals(list(res), ["foo", "bar", "baz"])
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
|
|||||||
self.rpc = impl_zmq
|
self.rpc = impl_zmq
|
||||||
self.rpc.register_opts(FLAGS)
|
self.rpc.register_opts(FLAGS)
|
||||||
FLAGS.set_default('rpc_zmq_matchmaker',
|
FLAGS.set_default('rpc_zmq_matchmaker',
|
||||||
'mod_matchmaker.MatchMakerLocalhost')
|
'mod_matchmaker.MatchMakerLocalhost')
|
||||||
|
|
||||||
# We'll change this if we detect no daemon running.
|
# We'll change this if we detect no daemon running.
|
||||||
ipc_dir = FLAGS.rpc_zmq_ipc_dir
|
ipc_dir = FLAGS.rpc_zmq_ipc_dir
|
||||||
@@ -87,17 +87,18 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
|
|||||||
consumption_proxy = impl_zmq.InternalContext(None)
|
consumption_proxy = impl_zmq.InternalContext(None)
|
||||||
|
|
||||||
self.reactor.register(consumption_proxy,
|
self.reactor.register(consumption_proxy,
|
||||||
consume_in, zmq.PULL, out_bind=True)
|
consume_in,
|
||||||
|
zmq.PULL,
|
||||||
|
out_bind=True)
|
||||||
self.reactor.consume_in_thread()
|
self.reactor.consume_in_thread()
|
||||||
except zmq.ZMQError:
|
except zmq.ZMQError:
|
||||||
assert False, _("Could not create ZeroMQ receiver daemon. "
|
assert False, _("Could not create ZeroMQ receiver daemon. "
|
||||||
"Socket may already be in use.")
|
"Socket may already be in use.")
|
||||||
except OSError:
|
except OSError:
|
||||||
assert False, _("Could not create IPC directory %s") % \
|
assert False, _("Could not create IPC directory %s") % (ipc_dir, )
|
||||||
(ipc_dir, )
|
|
||||||
finally:
|
finally:
|
||||||
super(_RpcZmqBaseTestCase, self).setUp(
|
super(RpcZmqBaseTestCase, self).setUp(
|
||||||
topic=topic, topic_nested=topic_nested)
|
topic=topic, topic_nested=topic_nested)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
if not impl_zmq:
|
if not impl_zmq:
|
||||||
@@ -128,5 +129,5 @@ class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase):
|
|||||||
"""
|
"""
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(RpcZmqDirectTopicTestCase, self).setUp(
|
super(RpcZmqDirectTopicTestCase, self).setUp(
|
||||||
topic='test.localhost',
|
topic='test.localhost',
|
||||||
topic_nested='nested.localhost')
|
topic_nested='nested.localhost')
|
||||||
|
|||||||
@@ -296,8 +296,8 @@ class ConfigFileOptsTestCase(BaseTestCase):
|
|||||||
self.conf.register_opt(opt_class('newfoo', deprecated_name='oldfoo'))
|
self.conf.register_opt(opt_class('newfoo', deprecated_name='oldfoo'))
|
||||||
|
|
||||||
paths2 = self.create_tempfiles([('test',
|
paths2 = self.create_tempfiles([('test',
|
||||||
'[DEFAULT]\n'
|
'[DEFAULT]\n'
|
||||||
'newfoo = %s\n' % value)])
|
'newfoo = %s\n' % value)])
|
||||||
|
|
||||||
self.conf(['--config-file', paths2[0]])
|
self.conf(['--config-file', paths2[0]])
|
||||||
self.assertTrue(hasattr(self.conf, 'newfoo'))
|
self.assertTrue(hasattr(self.conf, 'newfoo'))
|
||||||
@@ -306,8 +306,7 @@ class ConfigFileOptsTestCase(BaseTestCase):
|
|||||||
def test_conf_file_str_default(self):
|
def test_conf_file_str_default(self):
|
||||||
self.conf.register_opt(StrOpt('foo', default='bar'))
|
self.conf.register_opt(StrOpt('foo', default='bar'))
|
||||||
|
|
||||||
paths = self.create_tempfiles([('test',
|
paths = self.create_tempfiles([('test', '[DEFAULT]\n')])
|
||||||
'[DEFAULT]\n')])
|
|
||||||
|
|
||||||
self.conf(['--config-file', paths[0]])
|
self.conf(['--config-file', paths[0]])
|
||||||
|
|
||||||
@@ -317,9 +316,7 @@ class ConfigFileOptsTestCase(BaseTestCase):
|
|||||||
def test_conf_file_str_value(self):
|
def test_conf_file_str_value(self):
|
||||||
self.conf.register_opt(StrOpt('foo'))
|
self.conf.register_opt(StrOpt('foo'))
|
||||||
|
|
||||||
paths = self.create_tempfiles([('test',
|
paths = self.create_tempfiles([('test', '[DEFAULT]\n''foo = bar\n')])
|
||||||
'[DEFAULT]\n'
|
|
||||||
'foo = bar\n')])
|
|
||||||
|
|
||||||
self.conf(['--config-file', paths[0]])
|
self.conf(['--config-file', paths[0]])
|
||||||
|
|
||||||
@@ -577,7 +574,7 @@ class ConfigFileOptsTestCase(BaseTestCase):
|
|||||||
|
|
||||||
def test_conf_file_multistr_values_append_deprecated(self):
|
def test_conf_file_multistr_values_append_deprecated(self):
|
||||||
self.conf.register_cli_opt(MultiStrOpt('foo',
|
self.conf.register_cli_opt(MultiStrOpt('foo',
|
||||||
deprecated_name='oldfoo'))
|
deprecated_name='oldfoo'))
|
||||||
|
|
||||||
paths = self.create_tempfiles([('1',
|
paths = self.create_tempfiles([('1',
|
||||||
'[DEFAULT]\n'
|
'[DEFAULT]\n'
|
||||||
@@ -713,7 +710,7 @@ class OptGroupsTestCase(BaseTestCase):
|
|||||||
def test_arg_group_in_config_file_with_deprecated(self):
|
def test_arg_group_in_config_file_with_deprecated(self):
|
||||||
self.conf.register_group(OptGroup('blaa'))
|
self.conf.register_group(OptGroup('blaa'))
|
||||||
self.conf.register_opt(StrOpt('foo', deprecated_name='oldfoo'),
|
self.conf.register_opt(StrOpt('foo', deprecated_name='oldfoo'),
|
||||||
group='blaa')
|
group='blaa')
|
||||||
|
|
||||||
paths = self.create_tempfiles([('test',
|
paths = self.create_tempfiles([('test',
|
||||||
'[blaa]\n'
|
'[blaa]\n'
|
||||||
@@ -1410,20 +1407,20 @@ class OptDumpingTestCase(BaseTestCase):
|
|||||||
self.conf.log_opt_values(logger, 666)
|
self.conf.log_opt_values(logger, 666)
|
||||||
|
|
||||||
self.assertEquals(logger.logged, [
|
self.assertEquals(logger.logged, [
|
||||||
"*" * 80,
|
"*" * 80,
|
||||||
"Configuration options gathered from:",
|
"Configuration options gathered from:",
|
||||||
"command line args: ['--foo', 'this', '--blaa-bar', 'that', "
|
"command line args: ['--foo', 'this', '--blaa-bar', "
|
||||||
"'--blaa-key', 'admin', '--passwd', 'hush']",
|
"'that', '--blaa-key', 'admin', '--passwd', 'hush']",
|
||||||
"config files: []",
|
"config files: []",
|
||||||
"=" * 80,
|
"=" * 80,
|
||||||
"config_dir = None",
|
"config_dir = None",
|
||||||
"config_file = []",
|
"config_file = []",
|
||||||
"foo = this",
|
"foo = this",
|
||||||
"passwd = ****",
|
"passwd = ****",
|
||||||
"blaa.bar = that",
|
"blaa.bar = that",
|
||||||
"blaa.key = *****",
|
"blaa.key = *****",
|
||||||
"*" * 80,
|
"*" * 80,
|
||||||
])
|
])
|
||||||
|
|
||||||
|
|
||||||
class CommonOptsTestCase(BaseTestCase):
|
class CommonOptsTestCase(BaseTestCase):
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ class TestConfig(unittest.TestCase):
|
|||||||
|
|
||||||
# Test that an app cannot be configured
|
# Test that an app cannot be configured
|
||||||
with mock.patch('openstack.common.config.find_config_file',
|
with mock.patch('openstack.common.config.find_config_file',
|
||||||
mock.Mock(return_value=True)):
|
mock.Mock(return_value=True)):
|
||||||
self.assertRaises(RuntimeError, config.load_paste_config,
|
self.assertRaises(RuntimeError, config.load_paste_config,
|
||||||
'fake_app', {}, [])
|
'fake_app', {}, [])
|
||||||
|
|
||||||
|
|||||||
@@ -65,8 +65,9 @@ class ResourceExtensionTest(unittest.TestCase):
|
|||||||
return {'collection': 'value'}
|
return {'collection': 'value'}
|
||||||
|
|
||||||
def test_resource_can_be_added_as_extension(self):
|
def test_resource_can_be_added_as_extension(self):
|
||||||
res_ext = extensions.ResourceExtension('tweedles',
|
res_ext = extensions.ResourceExtension(
|
||||||
self.ResourceExtensionController())
|
'tweedles',
|
||||||
|
self.ResourceExtensionController())
|
||||||
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
|
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
|
||||||
|
|
||||||
index_response = test_app.get("/tweedles")
|
index_response = test_app.get("/tweedles")
|
||||||
@@ -179,8 +180,9 @@ class ActionExtensionTest(unittest.TestCase):
|
|||||||
action_name = 'FOXNSOX:add_tweedle'
|
action_name = 'FOXNSOX:add_tweedle'
|
||||||
action_params = dict(name='Beetle')
|
action_params = dict(name='Beetle')
|
||||||
req_body = json.dumps({action_name: action_params})
|
req_body = json.dumps({action_name: action_params})
|
||||||
response = self.extension_app.post('/dummy_resources/1/action',
|
response = self.extension_app.post(
|
||||||
req_body, content_type='application/json')
|
'/dummy_resources/1/action',
|
||||||
|
req_body, content_type='application/json')
|
||||||
|
|
||||||
self.assertEqual("Tweedle Beetle Added.", response.json)
|
self.assertEqual("Tweedle Beetle Added.", response.json)
|
||||||
|
|
||||||
@@ -188,8 +190,9 @@ class ActionExtensionTest(unittest.TestCase):
|
|||||||
action_name = 'FOXNSOX:delete_tweedle'
|
action_name = 'FOXNSOX:delete_tweedle'
|
||||||
action_params = dict(name='Bailey')
|
action_params = dict(name='Bailey')
|
||||||
req_body = json.dumps({action_name: action_params})
|
req_body = json.dumps({action_name: action_params})
|
||||||
response = self.extension_app.post("/dummy_resources/1/action",
|
response = self.extension_app.post(
|
||||||
req_body, content_type='application/json')
|
"/dummy_resources/1/action",
|
||||||
|
req_body, content_type='application/json')
|
||||||
self.assertEqual("Tweedle Bailey Deleted.", response.json)
|
self.assertEqual("Tweedle Bailey Deleted.", response.json)
|
||||||
|
|
||||||
def test_returns_404_for_non_existant_action(self):
|
def test_returns_404_for_non_existant_action(self):
|
||||||
@@ -197,9 +200,10 @@ class ActionExtensionTest(unittest.TestCase):
|
|||||||
action_params = dict(name="test")
|
action_params = dict(name="test")
|
||||||
req_body = json.dumps({non_existant_action: action_params})
|
req_body = json.dumps({non_existant_action: action_params})
|
||||||
|
|
||||||
response = self.extension_app.post("/dummy_resources/1/action",
|
response = self.extension_app.post(
|
||||||
req_body, content_type='application/json',
|
"/dummy_resources/1/action",
|
||||||
status='*')
|
req_body, content_type='application/json',
|
||||||
|
status='*')
|
||||||
|
|
||||||
self.assertEqual(404, response.status_int)
|
self.assertEqual(404, response.status_int)
|
||||||
|
|
||||||
@@ -208,8 +212,9 @@ class ActionExtensionTest(unittest.TestCase):
|
|||||||
action_params = dict(name='Beetle')
|
action_params = dict(name='Beetle')
|
||||||
req_body = json.dumps({action_name: action_params})
|
req_body = json.dumps({action_name: action_params})
|
||||||
|
|
||||||
response = self.extension_app.post("/asdf/1/action", req_body,
|
response = self.extension_app.post(
|
||||||
content_type='application/json', status='*')
|
"/asdf/1/action", req_body,
|
||||||
|
content_type='application/json', status='*')
|
||||||
self.assertEqual(404, response.status_int)
|
self.assertEqual(404, response.status_int)
|
||||||
|
|
||||||
|
|
||||||
@@ -226,7 +231,7 @@ class RequestExtensionTest(unittest.TestCase):
|
|||||||
headers={'X-NEW-REQUEST-HEADER': "sox"})
|
headers={'X-NEW-REQUEST-HEADER': "sox"})
|
||||||
|
|
||||||
self.assertEqual(response.headers['X-NEW-RESPONSE-HEADER'],
|
self.assertEqual(response.headers['X-NEW-RESPONSE-HEADER'],
|
||||||
"response_header_data")
|
"response_header_data")
|
||||||
|
|
||||||
def test_extend_get_resource_response(self):
|
def test_extend_get_resource_response(self):
|
||||||
def extend_response_data(req, res):
|
def extend_response_data(req, res):
|
||||||
@@ -269,15 +274,17 @@ class RequestExtensionTest(unittest.TestCase):
|
|||||||
self.assertEqual(response.json['uneditable'], "original_value")
|
self.assertEqual(response.json['uneditable'], "original_value")
|
||||||
|
|
||||||
ext_app = self._setup_app_with_request_handler(_update_handler,
|
ext_app = self._setup_app_with_request_handler(_update_handler,
|
||||||
'PUT')
|
'PUT')
|
||||||
ext_response = ext_app.put("/dummy_resources/1",
|
ext_response = ext_app.put(
|
||||||
json.dumps({'uneditable': "new_value"}),
|
"/dummy_resources/1",
|
||||||
headers={'Content-Type': "application/json"})
|
json.dumps({'uneditable': "new_value"}),
|
||||||
|
headers={'Content-Type': "application/json"})
|
||||||
self.assertEqual(ext_response.json['uneditable'], "new_value")
|
self.assertEqual(ext_response.json['uneditable'], "new_value")
|
||||||
|
|
||||||
def _setup_app_with_request_handler(self, handler, verb):
|
def _setup_app_with_request_handler(self, handler, verb):
|
||||||
req_ext = extensions.RequestExtension(verb,
|
req_ext = extensions.RequestExtension(
|
||||||
'/dummy_resources/:(id)', handler)
|
verb,
|
||||||
|
'/dummy_resources/:(id)', handler)
|
||||||
manager = SimpleExtensionManager(None, None, req_ext)
|
manager = SimpleExtensionManager(None, None, req_ext)
|
||||||
return setup_extensions_app(manager)
|
return setup_extensions_app(manager)
|
||||||
|
|
||||||
@@ -312,7 +319,8 @@ class ExtensionControllerTest(unittest.TestCase):
|
|||||||
response = self.test_app.get("/extensions")
|
response = self.test_app.get("/extensions")
|
||||||
foxnsox = response.json["extensions"][0]
|
foxnsox = response.json["extensions"][0]
|
||||||
|
|
||||||
self.assertEqual(foxnsox, {
|
self.assertEqual(
|
||||||
|
foxnsox, {
|
||||||
'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
|
'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
|
||||||
'name': 'Fox In Socks',
|
'name': 'Fox In Socks',
|
||||||
'updated': '2011-01-22T13:25:27-06:00',
|
'updated': '2011-01-22T13:25:27-06:00',
|
||||||
@@ -326,7 +334,8 @@ class ExtensionControllerTest(unittest.TestCase):
|
|||||||
json_response = self.test_app.get("/extensions/FOXNSOX").json
|
json_response = self.test_app.get("/extensions/FOXNSOX").json
|
||||||
foxnsox = json_response['extension']
|
foxnsox = json_response['extension']
|
||||||
|
|
||||||
self.assertEqual(foxnsox, {
|
self.assertEqual(
|
||||||
|
foxnsox, {
|
||||||
'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
|
'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
|
||||||
'name': 'Fox In Socks',
|
'name': 'Fox In Socks',
|
||||||
'updated': '2011-01-22T13:25:27-06:00',
|
'updated': '2011-01-22T13:25:27-06:00',
|
||||||
@@ -352,10 +361,12 @@ class ExtensionControllerTest(unittest.TestCase):
|
|||||||
exts = root.findall('{0}extension'.format(NS))
|
exts = root.findall('{0}extension'.format(NS))
|
||||||
fox_ext = exts[0]
|
fox_ext = exts[0]
|
||||||
self.assertEqual(fox_ext.get('name'), 'Fox In Socks')
|
self.assertEqual(fox_ext.get('name'), 'Fox In Socks')
|
||||||
self.assertEqual(fox_ext.get('namespace'),
|
self.assertEqual(
|
||||||
|
fox_ext.get('namespace'),
|
||||||
'http://www.fox.in.socks/api/ext/pie/v1.0')
|
'http://www.fox.in.socks/api/ext/pie/v1.0')
|
||||||
self.assertEqual(fox_ext.get('updated'), '2011-01-22T13:25:27-06:00')
|
self.assertEqual(fox_ext.get('updated'), '2011-01-22T13:25:27-06:00')
|
||||||
self.assertEqual(fox_ext.findtext('{0}description'.format(NS)),
|
self.assertEqual(
|
||||||
|
fox_ext.findtext('{0}description'.format(NS)),
|
||||||
'The Fox In Socks Extension')
|
'The Fox In Socks Extension')
|
||||||
|
|
||||||
def test_get_extension_xml(self):
|
def test_get_extension_xml(self):
|
||||||
@@ -367,10 +378,12 @@ class ExtensionControllerTest(unittest.TestCase):
|
|||||||
self.assertEqual(root.tag.split('extension')[0], NS)
|
self.assertEqual(root.tag.split('extension')[0], NS)
|
||||||
self.assertEqual(root.get('alias'), 'FOXNSOX')
|
self.assertEqual(root.get('alias'), 'FOXNSOX')
|
||||||
self.assertEqual(root.get('name'), 'Fox In Socks')
|
self.assertEqual(root.get('name'), 'Fox In Socks')
|
||||||
self.assertEqual(root.get('namespace'),
|
self.assertEqual(
|
||||||
|
root.get('namespace'),
|
||||||
'http://www.fox.in.socks/api/ext/pie/v1.0')
|
'http://www.fox.in.socks/api/ext/pie/v1.0')
|
||||||
self.assertEqual(root.get('updated'), '2011-01-22T13:25:27-06:00')
|
self.assertEqual(root.get('updated'), '2011-01-22T13:25:27-06:00')
|
||||||
self.assertEqual(root.findtext('{0}description'.format(NS)),
|
self.assertEqual(
|
||||||
|
root.findtext('{0}description'.format(NS)),
|
||||||
'The Fox In Socks Extension')
|
'The Fox In Socks Extension')
|
||||||
|
|
||||||
|
|
||||||
@@ -399,13 +412,15 @@ class ExtensionsXMLSerializerTest(unittest.TestCase):
|
|||||||
|
|
||||||
def test_serialize_extenstion(self):
|
def test_serialize_extenstion(self):
|
||||||
serializer = extensions.ExtensionsXMLSerializer()
|
serializer = extensions.ExtensionsXMLSerializer()
|
||||||
data = {'extension': {
|
data = {
|
||||||
'name': 'ext1',
|
'extension': {
|
||||||
'namespace': 'http://docs.rack.com/servers/api/ext/pie/v1.0',
|
'name': 'ext1',
|
||||||
'alias': 'RS-PIE',
|
'namespace': 'http://docs.rack.com/servers/api/ext/pie/v1.0',
|
||||||
'updated': '2011-01-22T13:25:27-06:00',
|
'alias': 'RS-PIE',
|
||||||
'description': 'Adds the capability to share an image.',
|
'updated': '2011-01-22T13:25:27-06:00',
|
||||||
'links': [{'rel': 'describedby',
|
'description': 'Adds the capability to share an image.',
|
||||||
|
'links': [
|
||||||
|
{'rel': 'describedby',
|
||||||
'type': 'application/pdf',
|
'type': 'application/pdf',
|
||||||
'href': 'http://docs.rack.com/servers/api/ext/cs.pdf'},
|
'href': 'http://docs.rack.com/servers/api/ext/cs.pdf'},
|
||||||
{'rel': 'describedby',
|
{'rel': 'describedby',
|
||||||
@@ -416,7 +431,7 @@ class ExtensionsXMLSerializerTest(unittest.TestCase):
|
|||||||
root = etree.XML(xml)
|
root = etree.XML(xml)
|
||||||
ext_dict = data['extension']
|
ext_dict = data['extension']
|
||||||
self.assertEqual(root.findtext('{0}description'.format(NS)),
|
self.assertEqual(root.findtext('{0}description'.format(NS)),
|
||||||
ext_dict['description'])
|
ext_dict['description'])
|
||||||
|
|
||||||
for key in ['name', 'namespace', 'alias', 'updated']:
|
for key in ['name', 'namespace', 'alias', 'updated']:
|
||||||
self.assertEqual(root.get(key), ext_dict[key])
|
self.assertEqual(root.get(key), ext_dict[key])
|
||||||
@@ -429,30 +444,31 @@ class ExtensionsXMLSerializerTest(unittest.TestCase):
|
|||||||
|
|
||||||
def test_serialize_extensions(self):
|
def test_serialize_extensions(self):
|
||||||
serializer = extensions.ExtensionsXMLSerializer()
|
serializer = extensions.ExtensionsXMLSerializer()
|
||||||
data = {"extensions": [{
|
data = {
|
||||||
|
"extensions": [{
|
||||||
"name": "Public Image Extension",
|
"name": "Public Image Extension",
|
||||||
"namespace": "http://foo.com/api/ext/pie/v1.0",
|
"namespace": "http://foo.com/api/ext/pie/v1.0",
|
||||||
"alias": "RS-PIE",
|
"alias": "RS-PIE",
|
||||||
"updated": "2011-01-22T13:25:27-06:00",
|
"updated": "2011-01-22T13:25:27-06:00",
|
||||||
"description": "Adds the capability to share an image.",
|
"description": "Adds the capability to share an image.",
|
||||||
"links": [{"rel": "describedby",
|
"links": [{"rel": "describedby",
|
||||||
"type": "application/pdf",
|
"type": "application/pdf",
|
||||||
"type": "application/vnd.sun.wadl+xml",
|
"type": "application/vnd.sun.wadl+xml",
|
||||||
"href": "http://foo.com/api/ext/cs-pie.pdf"},
|
"href": "http://foo.com/api/ext/cs-pie.pdf"},
|
||||||
{"rel": "describedby",
|
{"rel": "describedby",
|
||||||
"type": "application/vnd.sun.wadl+xml",
|
"type": "application/vnd.sun.wadl+xml",
|
||||||
"href": "http://foo.com/api/ext/cs-pie.wadl"}]},
|
"href": "http://foo.com/api/ext/cs-pie.wadl"}]},
|
||||||
{"name": "Cloud Block Storage",
|
{"name": "Cloud Block Storage",
|
||||||
"namespace": "http://foo.com/api/ext/cbs/v1.0",
|
"namespace": "http://foo.com/api/ext/cbs/v1.0",
|
||||||
"alias": "RS-CBS",
|
"alias": "RS-CBS",
|
||||||
"updated": "2011-01-12T11:22:33-06:00",
|
"updated": "2011-01-12T11:22:33-06:00",
|
||||||
"description": "Allows mounting cloud block storage.",
|
"description": "Allows mounting cloud block storage.",
|
||||||
"links": [{"rel": "describedby",
|
"links": [{"rel": "describedby",
|
||||||
"type": "application/pdf",
|
"type": "application/pdf",
|
||||||
"href": "http://foo.com/api/ext/cs-cbs.pdf"},
|
"href": "http://foo.com/api/ext/cs-cbs.pdf"},
|
||||||
{"rel": "describedby",
|
{"rel": "describedby",
|
||||||
"type": "application/vnd.sun.wadl+xml",
|
"type": "application/vnd.sun.wadl+xml",
|
||||||
"href": "http://foo.com/api/ext/cs-cbs.wadl"}]}]}
|
"href": "http://foo.com/api/ext/cs-cbs.wadl"}]}]}
|
||||||
|
|
||||||
xml = serializer.serialize(data, 'index')
|
xml = serializer.serialize(data, 'index')
|
||||||
root = etree.XML(xml)
|
root = etree.XML(xml)
|
||||||
@@ -534,7 +550,7 @@ class ExtensionDescriptorInterfaceTest(unittest.TestCase):
|
|||||||
'get_actions', 'get_request_extensions']
|
'get_actions', 'get_request_extensions']
|
||||||
|
|
||||||
predicate = lambda a: (inspect.ismethod(a) and
|
predicate = lambda a: (inspect.ismethod(a) and
|
||||||
not a.__name__.startswith('_'))
|
not a.__name__.startswith('_'))
|
||||||
for method in inspect.getmembers(extensions.ExtensionDescriptor,
|
for method in inspect.getmembers(extensions.ExtensionDescriptor,
|
||||||
predicate):
|
predicate):
|
||||||
self.assertFalse(method[0] not in contract_methods)
|
self.assertFalse(method[0] not in contract_methods)
|
||||||
|
|||||||
@@ -131,10 +131,12 @@ class BrainTestCase(unittest.TestCase):
|
|||||||
}"""
|
}"""
|
||||||
brain = policy.Brain.load_json(exemplar, "default")
|
brain = policy.Brain.load_json(exemplar, "default")
|
||||||
|
|
||||||
self.assertEqual(brain.rules, dict(
|
self.assertEqual(
|
||||||
|
brain.rules, dict(
|
||||||
admin_or_owner=[["role:admin"], ["project_id:%(project_id)s"]],
|
admin_or_owner=[["role:admin"], ["project_id:%(project_id)s"]],
|
||||||
default=[],
|
default=[],
|
||||||
))
|
)
|
||||||
|
)
|
||||||
self.assertEqual(brain.default_rule, "default")
|
self.assertEqual(brain.default_rule, "default")
|
||||||
|
|
||||||
def test_add_rule(self):
|
def test_add_rule(self):
|
||||||
@@ -142,7 +144,8 @@ class BrainTestCase(unittest.TestCase):
|
|||||||
brain.add_rule("rule1",
|
brain.add_rule("rule1",
|
||||||
[["role:admin"], ["project_id:%(project_id)s"]])
|
[["role:admin"], ["project_id:%(project_id)s"]])
|
||||||
|
|
||||||
self.assertEqual(brain.rules, dict(
|
self.assertEqual(
|
||||||
|
brain.rules, dict(
|
||||||
rule1=[["role:admin"], ["project_id:%(project_id)s"]]))
|
rule1=[["role:admin"], ["project_id:%(project_id)s"]]))
|
||||||
|
|
||||||
def test_check_with_badmatch(self):
|
def test_check_with_badmatch(self):
|
||||||
@@ -384,8 +387,8 @@ class HttpBrainTestCase(unittest.TestCase):
|
|||||||
self.assertEqual(result, False)
|
self.assertEqual(result, False)
|
||||||
self.assertEqual(self.url, "//spam.example.org/spam")
|
self.assertEqual(self.url, "//spam.example.org/spam")
|
||||||
self.assertEqual(self.decode_post_data(), dict(
|
self.assertEqual(self.decode_post_data(), dict(
|
||||||
target=dict(tenant="spam"),
|
target=dict(tenant="spam"),
|
||||||
credentials=dict(roles=["a", "b", "c"])))
|
credentials=dict(roles=["a", "b", "c"])))
|
||||||
|
|
||||||
def test_http_true(self):
|
def test_http_true(self):
|
||||||
self.urlopen_result = "True"
|
self.urlopen_result = "True"
|
||||||
@@ -397,5 +400,5 @@ class HttpBrainTestCase(unittest.TestCase):
|
|||||||
self.assertEqual(result, True)
|
self.assertEqual(result, True)
|
||||||
self.assertEqual(self.url, "//spam.example.org/spam")
|
self.assertEqual(self.url, "//spam.example.org/spam")
|
||||||
self.assertEqual(self.decode_post_data(), dict(
|
self.assertEqual(self.decode_post_data(), dict(
|
||||||
target=dict(tenant="spam"),
|
target=dict(tenant="spam"),
|
||||||
credentials=dict(roles=["a", "b", "c"])))
|
credentials=dict(roles=["a", "b", "c"])))
|
||||||
|
|||||||
@@ -38,19 +38,19 @@ class SetupTest(unittest.TestCase):
|
|||||||
with open(self.mailmap, 'w') as mm_fh:
|
with open(self.mailmap, 'w') as mm_fh:
|
||||||
mm_fh.write("Foo Bar <email@foo.com> Foo Bar <email@bar.com>\n")
|
mm_fh.write("Foo Bar <email@foo.com> Foo Bar <email@bar.com>\n")
|
||||||
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
|
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
|
||||||
parse_mailmap(self.mailmap))
|
parse_mailmap(self.mailmap))
|
||||||
|
|
||||||
def test_mailmap_with_firstname(self):
|
def test_mailmap_with_firstname(self):
|
||||||
with open(self.mailmap, 'w') as mm_fh:
|
with open(self.mailmap, 'w') as mm_fh:
|
||||||
mm_fh.write("Foo <email@foo.com> Foo <email@bar.com>\n")
|
mm_fh.write("Foo <email@foo.com> Foo <email@bar.com>\n")
|
||||||
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
|
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
|
||||||
parse_mailmap(self.mailmap))
|
parse_mailmap(self.mailmap))
|
||||||
|
|
||||||
def test_mailmap_with_noname(self):
|
def test_mailmap_with_noname(self):
|
||||||
with open(self.mailmap, 'w') as mm_fh:
|
with open(self.mailmap, 'w') as mm_fh:
|
||||||
mm_fh.write("<email@foo.com> <email@bar.com>\n")
|
mm_fh.write("<email@foo.com> <email@bar.com>\n")
|
||||||
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
|
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
|
||||||
parse_mailmap(self.mailmap))
|
parse_mailmap(self.mailmap))
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
if os.path.exists(self.mailmap):
|
if os.path.exists(self.mailmap):
|
||||||
|
|||||||
@@ -412,8 +412,9 @@ class ResourceTest(unittest.TestCase):
|
|||||||
|
|
||||||
def test_malformed_request_body_throws_bad_request(self):
|
def test_malformed_request_body_throws_bad_request(self):
|
||||||
resource = wsgi.Resource(None)
|
resource = wsgi.Resource(None)
|
||||||
request = wsgi.Request.blank("/", body="{mal:formed", method='POST',
|
request = wsgi.Request.blank(
|
||||||
headers={'Content-Type': "application/json"})
|
"/", body="{mal:formed", method='POST',
|
||||||
|
headers={'Content-Type': "application/json"})
|
||||||
|
|
||||||
response = resource(request)
|
response = resource(request)
|
||||||
self.assertEqual(response.status, '400 Bad Request')
|
self.assertEqual(response.status, '400 Bad Request')
|
||||||
@@ -421,7 +422,7 @@ class ResourceTest(unittest.TestCase):
|
|||||||
def test_wrong_content_type_throws_unsupported_media_type_error(self):
|
def test_wrong_content_type_throws_unsupported_media_type_error(self):
|
||||||
resource = wsgi.Resource(None)
|
resource = wsgi.Resource(None)
|
||||||
request = wsgi.Request.blank("/", body="{some:json}", method='POST',
|
request = wsgi.Request.blank("/", body="{some:json}", method='POST',
|
||||||
headers={'Content-Type': "xxx"})
|
headers={'Content-Type': "xxx"})
|
||||||
|
|
||||||
response = resource(request)
|
response = resource(request)
|
||||||
self.assertEqual(response.status, '415 Unsupported Media Type')
|
self.assertEqual(response.status, '415 Unsupported Media Type')
|
||||||
|
|||||||
Reference in New Issue
Block a user