Sync openstack common and add policy

openstack-common.conf now includes network_utils and policy

The default kombu imple for openstack.common.rpc seems to depends on
network_utils.

Fixes: bug #1058353

Change-Id: I2526c7355e7f2be67b25bf22854c56ec65741d9a
This commit is contained in:
Clay Gerrard 2012-09-28 16:27:09 -05:00
parent 2a15ffc589
commit 4cfb90a9b8
11 changed files with 498 additions and 145 deletions

View File

@ -0,0 +1,68 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Network-related utilities and helper functions.
"""
import logging
LOG = logging.getLogger(__name__)
def parse_host_port(address, default_port=None):
"""
Interpret a string as a host:port pair.
An IPv6 address MUST be escaped if accompanied by a port,
because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
means both [2001:db8:85a3::8a2e:370:7334] and
[2001:db8:85a3::8a2e:370]:7334.
>>> parse_host_port('server01:80')
('server01', 80)
>>> parse_host_port('server01')
('server01', None)
>>> parse_host_port('server01', default_port=1234)
('server01', 1234)
>>> parse_host_port('[::1]:80')
('::1', 80)
>>> parse_host_port('[::1]')
('::1', None)
>>> parse_host_port('[::1]', default_port=1234)
('::1', 1234)
>>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
('2001:db8:85a3::8a2e:370:7334', 1234)
"""
if address[0] == '[':
# Escaped ipv6
_host, _port = address[1:].split(']')
host = _host
if ':' in _port:
port = _port.split(':')[1]
else:
port = default_port
else:
if address.count(':') == 1:
host, port = address.split(':')
else:
# 0 means ipv4, >1 means ipv6.
# We prohibit unescaped ipv6 addresses with port.
host = address
port = default_port
return (host, None if port is None else int(port))

View File

@ -17,14 +17,15 @@
"""Common Policy Engine Implementation""" """Common Policy Engine Implementation"""
import logging
import urllib import urllib
import urllib2 import urllib2
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import jsonutils from cinder.openstack.common import jsonutils
class NotAuthorized(Exception): LOG = logging.getLogger(__name__)
pass
_BRAIN = None _BRAIN = None
@ -46,7 +47,8 @@ def reset():
_BRAIN = None _BRAIN = None
def enforce(match_list, target_dict, credentials_dict): def enforce(match_list, target_dict, credentials_dict, exc=None,
*args, **kwargs):
"""Enforces authorization of some rules against credentials. """Enforces authorization of some rules against credentials.
:param match_list: nested tuples of data to match against :param match_list: nested tuples of data to match against
@ -107,18 +109,35 @@ def enforce(match_list, target_dict, credentials_dict):
Credentials dicts contain as much information as we can about the user Credentials dicts contain as much information as we can about the user
performing the action. performing the action.
:raises NotAuthorized: if the check fails :param exc: exception to raise
Class of the exception to raise if the check fails. Any remaining
arguments passed to enforce() (both positional and keyword arguments)
will be passed to the exception class. If exc is not provided, returns
False.
:return: True if the policy allows the action
:return: False if the policy does not allow the action and exc is not set
""" """
global _BRAIN global _BRAIN
if not _BRAIN: if not _BRAIN:
_BRAIN = Brain() _BRAIN = Brain()
if not _BRAIN.check(match_list, target_dict, credentials_dict): if not _BRAIN.check(match_list, target_dict, credentials_dict):
raise NotAuthorized() if exc:
raise exc(*args, **kwargs)
return False
return True
class Brain(object): class Brain(object):
"""Implements policy checking.""" """Implements policy checking."""
_checks = {}
@classmethod
def _register(cls, name, func):
cls._checks[name] = func
@classmethod @classmethod
def load_json(cls, data, default_rule=None): def load_json(cls, data, default_rule=None):
"""Init a brain using json instead of a rules dictionary.""" """Init a brain using json instead of a rules dictionary."""
@ -126,6 +145,11 @@ class Brain(object):
return cls(rules=rules_dict, default_rule=default_rule) return cls(rules=rules_dict, default_rule=default_rule)
def __init__(self, rules=None, default_rule=None): def __init__(self, rules=None, default_rule=None):
if self.__class__ != Brain:
LOG.warning(_("Inheritance-based rules are deprecated; use "
"the default brain instead of %s.") %
self.__class__.__name__)
self.rules = rules or {} self.rules = rules or {}
self.default_rule = default_rule self.default_rule = default_rule
@ -133,16 +157,31 @@ class Brain(object):
self.rules[key] = match self.rules[key] = match
def _check(self, match, target_dict, cred_dict): def _check(self, match, target_dict, cred_dict):
match_kind, match_value = match.split(':', 1)
try: try:
f = getattr(self, '_check_%s' % match_kind) match_kind, match_value = match.split(':', 1)
except Exception:
LOG.exception(_("Failed to understand rule %(match)r") % locals())
# If the rule is invalid, fail closed
return False
func = None
try:
old_func = getattr(self, '_check_%s' % match_kind)
except AttributeError: except AttributeError:
if not self._check_generic(match, target_dict, cred_dict): func = self._checks.get(match_kind, self._checks.get(None, None))
return False
else: else:
if not f(match_value, target_dict, cred_dict): LOG.warning(_("Inheritance-based rules are deprecated; update "
return False "_check_%s") % match_kind)
return True func = lambda brain, kind, value, target, cred: old_func(value,
target,
cred)
if not func:
LOG.error(_("No handler for matches of kind %s") % match_kind)
# Fail closed
return False
return func(self, match_kind, match_value, target_dict, cred_dict)
def check(self, match_list, target_dict, cred_dict): def check(self, match_list, target_dict, cred_dict):
"""Checks authorization of some rules against credentials. """Checks authorization of some rules against credentials.
@ -166,58 +205,97 @@ class Brain(object):
return True return True
return False return False
def _check_rule(self, match, target_dict, cred_dict):
"""Recursively checks credentials based on the brains rules."""
try:
new_match_list = self.rules[match]
except KeyError:
if self.default_rule and match != self.default_rule:
new_match_list = ('rule:%s' % self.default_rule,)
else:
return False
return self.check(new_match_list, target_dict, cred_dict)
def _check_role(self, match, target_dict, cred_dict):
"""Check that there is a matching role in the cred dict."""
return match.lower() in [x.lower() for x in cred_dict['roles']]
def _check_generic(self, match, target_dict, cred_dict):
"""Check an individual match.
Matches look like:
tenant:%(tenant_id)s
role:compute:admin
"""
# TODO(termie): do dict inspection via dot syntax
match = match % target_dict
key, value = match.split(':', 1)
if key in cred_dict:
return value == cred_dict[key]
return False
class HttpBrain(Brain): class HttpBrain(Brain):
"""A brain that can check external urls for policy. """A brain that can check external urls for policy.
Posts json blobs for target and credentials. Posts json blobs for target and credentials.
Note that this brain is deprecated; the http check is registered
by default.
""" """
def _check_http(self, match, target_dict, cred_dict): pass
"""Check http: rules by calling to a remote server.
This example implementation simply verifies that the response is
exactly 'True'. A custom brain using response codes could easily
be implemented.
""" def register(name, func=None):
url = match % target_dict """
data = {'target': jsonutils.dumps(target_dict), Register a function as a policy check.
'credentials': jsonutils.dumps(cred_dict)}
post_data = urllib.urlencode(data) :param name: Gives the name of the check type, e.g., 'rule',
f = urllib2.urlopen(url, post_data) 'role', etc. If name is None, a default function
return f.read() == "True" will be registered.
:param func: If given, provides the function to register. If not
given, returns a function taking one argument to
specify the function to register, allowing use as a
decorator.
"""
# Perform the actual decoration by registering the function.
# Returns the function for compliance with the decorator
# interface.
def decorator(func):
# Register the function
Brain._register(name, func)
return func
# If the function is given, do the registration
if func:
return decorator(func)
return decorator
@register("rule")
def _check_rule(brain, match_kind, match, target_dict, cred_dict):
"""Recursively checks credentials based on the brains rules."""
try:
new_match_list = brain.rules[match]
except KeyError:
if brain.default_rule and match != brain.default_rule:
new_match_list = ('rule:%s' % brain.default_rule,)
else:
return False
return brain.check(new_match_list, target_dict, cred_dict)
@register("role")
def _check_role(brain, match_kind, match, target_dict, cred_dict):
"""Check that there is a matching role in the cred dict."""
return match.lower() in [x.lower() for x in cred_dict['roles']]
@register('http')
def _check_http(brain, match_kind, match, target_dict, cred_dict):
"""Check http: rules by calling to a remote server.
This example implementation simply verifies that the response is
exactly 'True'. A custom brain using response codes could easily
be implemented.
"""
url = 'http:' + (match % target_dict)
data = {'target': jsonutils.dumps(target_dict),
'credentials': jsonutils.dumps(cred_dict)}
post_data = urllib.urlencode(data)
f = urllib2.urlopen(url, post_data)
return f.read() == "True"
@register(None)
def _check_generic(brain, match_kind, match, target_dict, cred_dict):
"""Check an individual match.
Matches look like:
tenant:%(tenant_id)s
role:compute:admin
"""
# TODO(termie): do dict inspection via dot syntax
match = match % target_dict
if match_kind in cred_dict:
return match == unicode(cred_dict[match_kind])
return False

View File

@ -33,6 +33,7 @@ from cinder.openstack.common import cfg
from cinder.openstack.common.gettextutils import _ from cinder.openstack.common.gettextutils import _
from cinder.openstack.common.rpc import amqp as rpc_amqp from cinder.openstack.common.rpc import amqp as rpc_amqp
from cinder.openstack.common.rpc import common as rpc_common from cinder.openstack.common.rpc import common as rpc_common
from cinder.openstack.common import network_utils
kombu_opts = [ kombu_opts = [
cfg.StrOpt('kombu_ssl_version', cfg.StrOpt('kombu_ssl_version',
@ -50,10 +51,13 @@ kombu_opts = [
'(valid only if SSL enabled)')), '(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host', cfg.StrOpt('rabbit_host',
default='localhost', default='localhost',
help='the RabbitMQ host'), help='The RabbitMQ broker address where a single node is used'),
cfg.IntOpt('rabbit_port', cfg.IntOpt('rabbit_port',
default=5672, default=5672,
help='the RabbitMQ port'), help='The RabbitMQ broker port where a single node is used'),
cfg.ListOpt('rabbit_hosts',
default=['$rabbit_host:$rabbit_port'],
help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl', cfg.BoolOpt('rabbit_use_ssl',
default=False, default=False,
help='connect over SSL for RabbitMQ'), help='connect over SSL for RabbitMQ'),
@ -80,6 +84,11 @@ kombu_opts = [
cfg.BoolOpt('rabbit_durable_queues', cfg.BoolOpt('rabbit_durable_queues',
default=False, default=False,
help='use durable queues in RabbitMQ'), help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
'You need to wipe RabbitMQ database when '
'changing this option.'),
] ]
@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG LOG = rpc_common.LOG
def _get_queue_arguments(conf):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we declare a mirrored queue
as described here:
http://www.rabbitmq.com/ha.html
Setting x-ha-policy to all means that the queue will be mirrored
to all nodes in the cluster.
"""
return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
class ConsumerBase(object): class ConsumerBase(object):
"""Consumer base class.""" """Consumer base class."""
@ -192,7 +215,7 @@ class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'""" """Consumer class for 'topic'"""
def __init__(self, conf, channel, topic, callback, tag, name=None, def __init__(self, conf, channel, topic, callback, tag, name=None,
**kwargs): exchange_name=None, **kwargs):
"""Init a 'topic' queue. """Init a 'topic' queue.
:param channel: the amqp channel to use :param channel: the amqp channel to use
@ -207,13 +230,15 @@ class TopicConsumer(ConsumerBase):
""" """
# Default options # Default options
options = {'durable': conf.rabbit_durable_queues, options = {'durable': conf.rabbit_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False, 'auto_delete': False,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange( exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
name=rpc_amqp.get_control_exchange(conf), exchange = kombu.entity.Exchange(name=exchange_name,
type='topic', durable=options['durable'], type='topic',
auto_delete=options['auto_delete']) durable=options['durable'],
auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(channel, super(TopicConsumer, self).__init__(channel,
callback, callback,
tag, tag,
@ -307,9 +332,12 @@ class TopicPublisher(Publisher):
'auto_delete': False, 'auto_delete': False,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(channel, super(TopicPublisher, self).__init__(channel,
rpc_amqp.get_control_exchange(conf), topic, exchange_name,
type='topic', **options) topic,
type='topic',
**options)
class FanoutPublisher(Publisher): class FanoutPublisher(Publisher):
@ -332,6 +360,7 @@ class NotifyPublisher(TopicPublisher):
def __init__(self, conf, channel, topic, **kwargs): def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel): def reconnect(self, channel):
@ -344,7 +373,8 @@ class NotifyPublisher(TopicPublisher):
exchange=self.exchange, exchange=self.exchange,
durable=self.durable, durable=self.durable,
name=self.routing_key, name=self.routing_key,
routing_key=self.routing_key) routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare() queue.declare()
@ -369,31 +399,37 @@ class Connection(object):
if server_params is None: if server_params is None:
server_params = {} server_params = {}
# Keys to translate from server_params to kombu params # Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'} server_params_to_kombu_params = {'username': 'userid'}
params = {} ssl_params = self._fetch_ssl_params()
for sp_key, value in server_params.iteritems(): params_list = []
p_key = server_params_to_kombu_params.get(sp_key, sp_key) for adr in self.conf.rabbit_hosts:
params[p_key] = value hostname, port = network_utils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
params.setdefault('hostname', self.conf.rabbit_host) params = {}
params.setdefault('port', self.conf.rabbit_port)
params.setdefault('userid', self.conf.rabbit_userid)
params.setdefault('password', self.conf.rabbit_password)
params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
self.params = params for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
if self.conf.fake_rabbit: params.setdefault('hostname', hostname)
self.params['transport'] = 'memory' params.setdefault('port', port)
self.memory_transport = True params.setdefault('userid', self.conf.rabbit_userid)
else: params.setdefault('password', self.conf.rabbit_password)
self.memory_transport = False params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
if self.conf.rabbit_use_ssl: if self.conf.fake_rabbit:
self.params['ssl'] = self._fetch_ssl_params() params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
params['ssl'] = ssl_params
params_list.append(params)
self.params_list = params_list
self.memory_transport = self.conf.fake_rabbit
self.connection = None self.connection = None
self.reconnect() self.reconnect()
@ -423,14 +459,14 @@ class Connection(object):
# Return the extended behavior # Return the extended behavior
return ssl_params return ssl_params
def _connect(self): def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have """Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should been declared before if we are reconnecting. Exceptions should
be handled by the caller. be handled by the caller.
""" """
if self.connection: if self.connection:
LOG.info(_("Reconnecting to AMQP server on " LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % self.params) "%(hostname)s:%(port)d") % params)
try: try:
self.connection.close() self.connection.close()
except self.connection_errors: except self.connection_errors:
@ -438,7 +474,7 @@ class Connection(object):
# Setting this in case the next statement fails, though # Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet. # it shouldn't be doing any network operations, yet.
self.connection = None self.connection = None
self.connection = kombu.connection.BrokerConnection(**self.params) self.connection = kombu.connection.BrokerConnection(**params)
self.connection_errors = self.connection.connection_errors self.connection_errors = self.connection.connection_errors
if self.memory_transport: if self.memory_transport:
# Kludge to speed up tests. # Kludge to speed up tests.
@ -451,8 +487,8 @@ class Connection(object):
self.channel._new_queue('ae.undeliver') self.channel._new_queue('ae.undeliver')
for consumer in self.consumers: for consumer in self.consumers:
consumer.reconnect(self.channel) consumer.reconnect(self.channel)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
self.params) params)
def reconnect(self): def reconnect(self):
"""Handles reconnecting and re-establishing queues. """Handles reconnecting and re-establishing queues.
@ -465,11 +501,12 @@ class Connection(object):
attempt = 0 attempt = 0
while True: while True:
params = self.params_list[attempt % len(self.params_list)]
attempt += 1 attempt += 1
try: try:
self._connect() self._connect(params)
return return
except (self.connection_errors, IOError), e: except (IOError, self.connection_errors) as e:
pass pass
except Exception, e: except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib # NOTE(comstud): Unfortunately it's possible for amqplib
@ -484,12 +521,12 @@ class Connection(object):
log_info = {} log_info = {}
log_info['err_str'] = str(e) log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries log_info['max_retries'] = self.max_retries
log_info.update(self.params) log_info.update(params)
if self.max_retries and attempt == self.max_retries: if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on ' LOG.error(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d ' '%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info) 'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's # NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we # really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore. # need to consume on, we have no way to consume anymore.
@ -503,9 +540,9 @@ class Connection(object):
sleep_time = min(sleep_time, self.interval_max) sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time log_info['sleep_time'] = sleep_time
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
' unreachable: %(err_str)s. Trying again in ' 'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info) '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time) time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs): def ensure(self, error_callback, method, *args, **kwargs):
@ -513,7 +550,8 @@ class Connection(object):
try: try:
return method(*args, **kwargs) return method(*args, **kwargs)
except (self.connection_errors, socket.timeout, IOError), e: except (self.connection_errors, socket.timeout, IOError), e:
pass if error_callback:
error_callback(e)
except Exception, e: except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib # NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport # to return an error not covered by its transport
@ -523,8 +561,8 @@ class Connection(object):
# and try to reconnect in this case. # and try to reconnect in this case.
if 'timeout' not in str(e): if 'timeout' not in str(e):
raise raise
if error_callback: if error_callback:
error_callback(e) error_callback(e)
self.reconnect() self.reconnect()
def get_channel(self): def get_channel(self):
@ -626,10 +664,12 @@ class Connection(object):
""" """
self.declare_consumer(DirectConsumer, topic, callback) self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None): def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None):
"""Create a 'topic' consumer.""" """Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer, self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name, name=queue_name,
exchange_name=exchange_name,
), ),
topic, callback) topic, callback)

View File

@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase): class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'""" """Consumer class for 'topic'"""
def __init__(self, conf, session, topic, callback, name=None): def __init__(self, conf, session, topic, callback, name=None,
exchange_name=None):
"""Init a 'topic' queue. """Init a 'topic' queue.
:param session: the amqp session to use :param session: the amqp session to use
@ -180,9 +181,10 @@ class TopicConsumer(ConsumerBase):
:param name: optional queue name, defaults to topic :param name: optional queue name, defaults to topic
""" """
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback, super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (rpc_amqp.get_control_exchange(conf), topic), "%s/%s" % (exchange_name, topic),
{}, name or topic, {}) {}, name or topic, {})
class FanoutConsumer(ConsumerBase): class FanoutConsumer(ConsumerBase):
@ -255,8 +257,9 @@ class TopicPublisher(Publisher):
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'topic' publisher. """init a 'topic' publisher.
""" """
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(session, super(TopicPublisher, self).__init__(session,
"%s/%s" % (rpc_amqp.get_control_exchange(conf), topic)) "%s/%s" % (exchange_name, topic))
class FanoutPublisher(Publisher): class FanoutPublisher(Publisher):
@ -274,9 +277,10 @@ class NotifyPublisher(Publisher):
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'topic' publisher. """init a 'topic' publisher.
""" """
exchange_name = rpc_amqp.get_control_exchange(conf)
super(NotifyPublisher, self).__init__(session, super(NotifyPublisher, self).__init__(session,
"%s/%s" % (rpc_amqp.get_control_exchange(conf), topic), "%s/%s" % (exchange_name, topic),
{"durable": True}) {"durable": True})
class Connection(object): class Connection(object):
@ -461,10 +465,12 @@ class Connection(object):
""" """
self.declare_consumer(DirectConsumer, topic, callback) self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None): def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None):
"""Create a 'topic' consumer.""" """Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer, self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name, name=queue_name,
exchange_name=exchange_name,
), ),
topic, callback) topic, callback)

View File

@ -58,6 +58,9 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_port', default=9501, cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'), help='ZeroMQ receiver listening port'),
cfg.IntOpt('rpc_zmq_port_pub', default=9502,
help='ZeroMQ fanout publisher port'),
cfg.IntOpt('rpc_zmq_contexts', default=1, cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'), help='Number of ZeroMQ contexts, defaults to 1'),
@ -206,7 +209,7 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind) self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data): def cast(self, msg_id, topic, data):
self.outq.send([str(msg_id), str(topic), str('cast'), self.outq.send([str(topic), str(msg_id), str('cast'),
_serialize(data)]) _serialize(data)])
def close(self): def close(self):
@ -299,6 +302,9 @@ class ConsumerBase(object):
else: else:
return [result] return [result]
def consume(self, sock):
raise NotImplementedError()
def process(self, style, target, proxy, ctx, data): def process(self, style, target, proxy, ctx, data):
# Method starting with - are # Method starting with - are
# processed internally. (non-valid method name) # processed internally. (non-valid method name)
@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PUB, bind=True) zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies']) self.sockets.append(self.topic_proxy['zmq_replies'])
self.topic_proxy['fanout~'] = \
ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['fanout~'])
def consume(self, sock): def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying) #TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv() data = sock.recv()
msg_id, topic, style, in_msg = data topic, msg_id, style, in_msg = data
topic = topic.split('.', 1)[0] topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
# Handle zmq_replies magic # Handle zmq_replies magic
if topic.startswith('fanout~'): if topic.startswith('fanout~'):
sock_type = zmq.PUB sock_type = zmq.PUB
# This doesn't change what is in the message,
# it only specifies that these messages go to
# the generic fanout topic.
topic = 'fanout~'
elif topic.startswith('zmq_replies'): elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB sock_type = zmq.PUB
inside = _deserialize(in_msg) inside = _deserialize(in_msg)
@ -434,23 +450,32 @@ class ZmqProxy(ZmqBaseReactor):
else: else:
sock_type = zmq.PUSH sock_type = zmq.PUSH
if not topic in self.topic_proxy: if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
sock_type, bind=True) sock_type, bind=True)
self.topic_proxy[topic] = outq self.topic_proxy[topic] = outq
self.sockets.append(outq) self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic) LOG.info(_("Created topic proxy: %s"), topic)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data) self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
class CallbackReactor(ZmqBaseReactor):
"""
A consumer class passing messages to a callback
"""
def __init__(self, conf, callback):
self._cb = callback
super(CallbackReactor, self).__init__(conf)
def consume(self, sock):
data = sock.recv()
self._cb(data[3])
class ZmqReactor(ZmqBaseReactor): class ZmqReactor(ZmqBaseReactor):
""" """
A consumer class implementing a A consumer class implementing a
@ -471,7 +496,7 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data) self.mapping[sock].send(data)
return return
msg_id, topic, style, in_msg = data topic, msg_id, style, in_msg = data
ctx, request = _deserialize(in_msg) ctx, request = _deserialize(in_msg)
ctx = RpcContext.unmarshal(ctx) ctx = RpcContext.unmarshal(ctx)
@ -488,6 +513,26 @@ class Connection(rpc_common.Connection):
def __init__(self, conf): def __init__(self, conf):
self.reactor = ZmqReactor(conf) self.reactor = ZmqReactor(conf)
def _consume_fanout(self, reactor, topic, proxy, bind=False):
for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
def declare_topic_consumer(self, topic, callback=None,
queue_name=None):
"""declare_topic_consumer is a private method, but
it is being used by Quantum (Folsom).
This has been added compatibility.
"""
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
return
reactor = CallbackReactor(CONF, callback)
self._consume_fanout(reactor, topic, None, bind=False)
def create_consumer(self, topic, proxy, fanout=False): def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name. # Only consume on the base topic name.
topic = topic.split('.', 1)[0] topic = topic.split('.', 1)[0]
@ -495,22 +540,35 @@ class Connection(rpc_common.Connection):
LOG.info(_("Create Consumer for topic (%(topic)s)") % LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic}) {'topic': topic})
# Subscription scenarios # Consume direct-push fanout messages (relay to local consumers)
if fanout: if fanout:
subscribe = ('', fanout)[type(fanout) == str] # If we're not in here, we can't receive direct fanout messages
if CONF.rpc_zmq_host in matchmaker.queues(topic):
# Consume from all remote publishers.
self._consume_fanout(self.reactor, topic, proxy)
else:
LOG.warn("This service cannot receive direct PUSH fanout "
"messages without being known by the matchmaker.")
return
# Configure consumer for direct pushes.
subscribe = (topic, fanout)[type(fanout) == str]
sock_type = zmq.SUB sock_type = zmq.SUB
topic = 'fanout~' + topic topic = 'fanout~' + topic
inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
else: else:
sock_type = zmq.PULL sock_type = zmq.PULL
subscribe = None subscribe = None
# Receive messages from (local) proxy # Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \ inaddr = "ipc://%s/zmq_topic_%s" % \
(CONF.rpc_zmq_ipc_dir, topic) (CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"), LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB]) ['PULL', 'SUB'][sock_type == zmq.SUB])
# Consume messages from local rpc-zmq-receiver daemon.
self.reactor.register(proxy, inaddr, sock_type, self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False) subscribe=subscribe, in_bind=False)

View File

@ -132,6 +132,14 @@ class FanoutBinding(Binding):
return False return False
class PublisherBinding(Binding):
"""Match on publishers keys, where key starts with 'publishers.' string."""
def test(self, key):
if key.startswith('publishers~'):
return True
return False
class StubExchange(Exchange): class StubExchange(Exchange):
"""Exchange that does nothing.""" """Exchange that does nothing."""
def run(self, key): def run(self, key):
@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange):
return [(key + '.' + host, host)] return [(key + '.' + host, host)]
class PublisherRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(PublisherRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "publishers~", strip it for lookup.
nkey = key.split('publishers~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class FanoutRingExchange(RingExchange): class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap.""" """Fanout Exchange based on a hashmap."""
def __init__(self, ring=None): def __init__(self, ring=None):
@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange):
"see ringfile") % (nkey, ) "see ringfile") % (nkey, )
) )
return [] return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey]) return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
['localhost'])
class LocalhostExchange(Exchange): class LocalhostExchange(Exchange):
@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase):
""" """
def __init__(self, ring=None): def __init__(self, ring=None):
super(MatchMakerRing, self).__init__() super(MatchMakerRing, self).__init__()
self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase):
""" """
def __init__(self): def __init__(self):
super(MatchMakerLocalhost, self).__init__() super(MatchMakerLocalhost, self).__init__()
self.add_binding(PublisherBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange()) self.add_binding(TopicBinding(), LocalhostExchange())
@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase):
def __init__(self): def __init__(self):
super(MatchMakerLocalhost, self).__init__() super(MatchMakerLocalhost, self).__init__()
self.add_binding(PublisherBinding(), StubExchange())
self.add_binding(FanoutBinding(), StubExchange()) self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange()) self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange()) self.add_binding(TopicBinding(), StubExchange())

View File

@ -0,0 +1,69 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import log as logging
from cinder.openstack.common import rpc
from cinder.openstack.common import service
LOG = logging.getLogger(__name__)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host."""
def __init__(self, host, topic, manager=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super(Service, self).start()
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
super(Service, self).stop()

View File

@ -62,9 +62,11 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
def normalize_time(timestamp): def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC""" """Normalize time in arbitrary timezone to UTC naive object"""
offset = timestamp.utcoffset() offset = timestamp.utcoffset()
return timestamp.replace(tzinfo=None) - offset if offset else timestamp if offset is None:
return timestamp
return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds): def is_older_than(before, seconds):
@ -121,6 +123,10 @@ def marshall_now(now=None):
def unmarshall_time(tyme): def unmarshall_time(tyme):
"""Unmarshall a datetime dict.""" """Unmarshall a datetime dict."""
return datetime.datetime(day=tyme['day'], month=tyme['month'], return datetime.datetime(day=tyme['day'],
year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'], month=tyme['month'],
second=tyme['second'], microsecond=tyme['microsecond']) year=tyme['year'],
hour=tyme['hour'],
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])

View File

@ -17,10 +17,10 @@
"""Policy Engine For Cinder""" """Policy Engine For Cinder"""
from cinder.common import policy
from cinder import exception from cinder import exception
from cinder import flags from cinder import flags
from cinder.openstack.common import cfg from cinder.openstack.common import cfg
from cinder.openstack.common import policy
from cinder import utils from cinder import utils
@ -84,7 +84,5 @@ def enforce(context, action, target):
match_list = ('rule:%s' % action,) match_list = ('rule:%s' % action,)
credentials = context.to_dict() credentials = context.to_dict()
try: policy.enforce(match_list, target, credentials,
policy.enforce(match_list, target, credentials) exception.PolicyNotAuthorized, action=action)
except policy.NotAuthorized:
raise exception.PolicyNotAuthorized(action=action)

View File

@ -21,11 +21,11 @@ import os.path
import StringIO import StringIO
import urllib2 import urllib2
from cinder.common import policy as common_policy
from cinder import context from cinder import context
from cinder import exception from cinder import exception
from cinder import flags from cinder import flags
import cinder.common.policy import cinder.openstack.common.policy
from cinder.openstack.common import policy as common_policy
from cinder import policy from cinder import policy
from cinder import test from cinder import test
from cinder import utils from cinder import utils
@ -169,8 +169,9 @@ class DefaultPolicyTestCase(test.TestCase):
self.context = context.RequestContext('fake', 'fake') self.context = context.RequestContext('fake', 'fake')
def _set_brain(self, default_rule): def _set_brain(self, default_rule):
brain = cinder.common.policy.HttpBrain(self.rules, default_rule) brain = cinder.openstack.common.policy.HttpBrain(self.rules,
cinder.common.policy.set_brain(brain) default_rule)
cinder.openstack.common.policy.set_brain(brain)
def tearDown(self): def tearDown(self):
super(DefaultPolicyTestCase, self).tearDown() super(DefaultPolicyTestCase, self).tearDown()

View File

@ -1,7 +1,7 @@
[DEFAULT] [DEFAULT]
# The list of modules to copy from openstack-common # The list of modules to copy from openstack-common
modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context,network_utils,policy
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=cinder base=cinder