Add utils & exception from openstack-common

Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
This commit is contained in:
Angus Salkeld
2012-04-05 16:39:22 +10:00
parent aff20aca99
commit e7f05dba5d
18 changed files with 595 additions and 449 deletions

View File

@@ -18,16 +18,7 @@
# under the License.
from heat.openstack.common import cfg
from heat.common import utils
from heat.common import config
rpc_backend_opt = cfg.StrOpt('rpc_backend',
default='heat.rpc.impl_qpid',
help="The messaging module to use, defaults to kombu.")
FLAGS = config.FLAGS
FLAGS.register_opt(rpc_backend_opt)
from heat.openstack.common import utils
def create_connection(new=True):
@@ -193,10 +184,17 @@ def fanout_cast_to_server(context, server_params, topic, msg):
_RPCIMPL = None
def configure(conf):
"""Delay import of rpc_backend until FLAGS are loaded."""
print 'configuring rpc %s' % conf.rpc_backend
global _RPCIMPL
_RPCIMPL = utils.import_object(conf.rpc_backend)
def _get_impl():
"""Delay import of rpc_backend until FLAGS are loaded."""
global _RPCIMPL
if _RPCIMPL is None:
_RPCIMPL = utils.import_object(FLAGS.rpc_backend)
print 'rpc not configured'
return _RPCIMPL

View File

@@ -42,13 +42,14 @@ from heat.openstack.common import local
import heat.rpc.common as rpc_common
LOG = logging.getLogger(__name__)
FLAGS = config.FLAGS
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, *args, **kwargs):
self.connection_cls = kwargs.pop("connection_cls", None)
kwargs.setdefault("max_size", config.FLAGS.rpc_conn_pool_size)
kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
@@ -206,7 +207,7 @@ class ProxyCallback(object):
def __init__(self, proxy, connection_pool):
self.proxy = proxy
self.pool = greenpool.GreenPool(config.FLAGS.rpc_thread_pool_size)
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
self.connection_pool = connection_pool
def __call__(self, message_data):
@@ -267,7 +268,7 @@ class MulticallWaiter(object):
def __init__(self, connection, timeout):
self._connection = connection
self._iterator = connection.iterconsume(
timeout=timeout or config.FLAGS.rpc_response_timeout)
timeout=timeout or FLAGS.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False

View File

@@ -20,29 +20,15 @@
import copy
import logging
from heat.common import exception
from heat.openstack.common import cfg
from heat.openstack.common import exception
from heat.common import config
LOG = logging.getLogger(__name__)
rpc_opts = [
cfg.IntOpt('rpc_thread_pool_size',
default=1024,
help='Size of RPC thread pool'),
cfg.IntOpt('rpc_conn_pool_size',
default=30,
help='Size of RPC connection pool'),
cfg.IntOpt('rpc_response_timeout',
default=60,
help='Seconds to wait for a response from call or multicall'),
]
config.FLAGS.register_opts(rpc_opts)
class RemoteError(exception.NovaException):
class RemoteError(exception.OpenstackException):
"""Signifies that a remote class has raised an exception.
Contains a string representation of the type of the original exception,
@@ -62,7 +48,7 @@ class RemoteError(exception.NovaException):
traceback=traceback)
class Timeout(exception.NovaException):
class Timeout(exception.OpenstackException):
"""Signifies that a timeout has occurred.
This exception is raised if the rpc_response_timeout is reached while

View File

@@ -33,55 +33,6 @@ from heat.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
qpid_opts = [
cfg.StrOpt('qpid_hostname',
default='localhost',
help='Qpid broker hostname'),
cfg.StrOpt('qpid_port',
default='5672',
help='Qpid broker port'),
cfg.StrOpt('qpid_username',
default='',
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
help='Password for qpid connection'),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
cfg.BoolOpt('qpid_reconnect',
default=True,
help='Automatically reconnect'),
cfg.IntOpt('qpid_reconnect_timeout',
default=0,
help='Reconnection timeout in seconds'),
cfg.IntOpt('qpid_reconnect_limit',
default=0,
help='Max reconnections before giving up'),
cfg.IntOpt('qpid_reconnect_interval_min',
default=0,
help='Minimum seconds between reconnection attempts'),
cfg.IntOpt('qpid_reconnect_interval_max',
default=0,
help='Maximum seconds between reconnection attempts'),
cfg.IntOpt('qpid_reconnect_interval',
default=0,
help='Equivalent to setting max and min to the same value'),
cfg.IntOpt('qpid_heartbeat',
default=5,
help='Seconds between connection keepalive heartbeats'),
cfg.StrOpt('qpid_protocol',
default='tcp',
help="Transport to use, either 'tcp' or 'ssl'"),
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
]
FLAGS = config.FLAGS
FLAGS.register_opts(qpid_opts)
class ConsumerBase(object):
"""Consumer base class."""
@@ -174,7 +125,7 @@ class TopicConsumer(ConsumerBase):
"""
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (FLAGS.control_exchange, topic), {},
"%s/%s" % (config.FLAGS.control_exchange, topic), {},
topic, {})
@@ -248,7 +199,7 @@ class TopicPublisher(Publisher):
"""init a 'topic' publisher.
"""
super(TopicPublisher, self).__init__(session,
"%s/%s" % (FLAGS.control_exchange, topic))
"%s/%s" % (config.FLAGS.control_exchange, topic))
class FanoutPublisher(Publisher):
@@ -266,7 +217,7 @@ class NotifyPublisher(Publisher):
"""init a 'topic' publisher.
"""
super(NotifyPublisher, self).__init__(session,
"%s/%s" % (FLAGS.control_exchange, topic),
"%s/%s" % (config.FLAGS.control_exchange, topic),
{"durable": True})
@@ -281,10 +232,10 @@ class Connection(object):
if server_params is None:
server_params = {}
default_params = dict(hostname=FLAGS.qpid_hostname,
port=FLAGS.qpid_port,
username=FLAGS.qpid_username,
password=FLAGS.qpid_password)
default_params = dict(hostname=config.FLAGS.qpid_hostname,
port=config.FLAGS.qpid_port,
username=config.FLAGS.qpid_username,
password=config.FLAGS.qpid_password)
params = server_params
for key in default_params.keys():
@@ -298,23 +249,23 @@ class Connection(object):
# before we call open
self.connection.username = params['username']
self.connection.password = params['password']
self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms
self.connection.reconnect = FLAGS.qpid_reconnect
if FLAGS.qpid_reconnect_timeout:
self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout
if FLAGS.qpid_reconnect_limit:
self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit
if FLAGS.qpid_reconnect_interval_max:
self.connection.sasl_mechanisms = config.FLAGS.qpid_sasl_mechanisms
self.connection.reconnect = config.FLAGS.qpid_reconnect
if config.FLAGS.qpid_reconnect_timeout:
self.connection.reconnect_timeout = config.FLAGS.qpid_reconnect_timeout
if config.FLAGS.qpid_reconnect_limit:
self.connection.reconnect_limit = config.FLAGS.qpid_reconnect_limit
if config.FLAGS.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
FLAGS.qpid_reconnect_interval_max)
if FLAGS.qpid_reconnect_interval_min:
config.FLAGS.qpid_reconnect_interval_max)
if config.FLAGS.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
FLAGS.qpid_reconnect_interval_min)
if FLAGS.qpid_reconnect_interval:
self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval
self.connection.hearbeat = FLAGS.qpid_heartbeat
self.connection.protocol = FLAGS.qpid_protocol
self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay
config.FLAGS.qpid_reconnect_interval_min)
if config.FLAGS.qpid_reconnect_interval:
self.connection.reconnect_interval = config.FLAGS.qpid_reconnect_interval
self.connection.hearbeat = config.FLAGS.qpid_heartbeat
self.connection.protocol = config.FLAGS.qpid_protocol
self.connection.tcp_nodelay = config.FLAGS.qpid_tcp_nodelay
# Open is part of reconnect -
# NOTE(WGH) not sure we need this with the reconnect flags
@@ -339,7 +290,7 @@ class Connection(object):
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
LOG.error(_('Unable to connect to AMQP server: %s ') % e)
time.sleep(FLAGS.qpid_reconnect_interval or 1)
time.sleep(config.FLAGS.qpid_reconnect_interval or 1)
else:
break