Initial work on migrating heat-engine to rpc

Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
This commit is contained in:
Angus Salkeld 2012-03-27 11:38:48 +11:00
parent 91ee085a0d
commit 5c30a02e00
15 changed files with 2977 additions and 20 deletions

View File

@ -21,34 +21,37 @@ which then calls into this engine.
""" """
import gettext import gettext
import eventlet
eventlet.monkey_patch()
import os import os
import sys import sys
import logging
# If ../heat/__init__.py exists, add ../ to Python search path, so that # If ../heat/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python... # it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir, os.pardir,
os.pardir)) os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'heat', '__init__.py')): if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'heat', '__init__.py')):
sys.path.insert(0, possible_topdir) sys.path.insert(0, POSSIBLE_TOPDIR)
gettext.install('heat', unicode=1) gettext.install('heat', unicode=1)
from heat import service
from heat.common import utils
from heat.common import config from heat.common import config
from heat.common import wsgi
logger = logging.getLogger('heat.engine')
if __name__ == '__main__': if __name__ == '__main__':
try:
conf = config.HeatConfigOpts()
conf()
app = config.load_paste_app(conf) config.FLAGS(sys.argv)
config.setup_logging(config.FLAGS)
port = config.DEFAULT_PORT+1 #utils.monkey_patch()
print 'Starting Heat Engine on port %s' % port server = service.Service.create(binary='heat-engine',
server = wsgi.Server() topic='engine',
server.start(app, conf, default_port=port) manager='heat.engine.manager.EngineManager')
server.wait() service.serve(server)
except RuntimeError, e: service.wait()
sys.exit("ERROR: %s" % e)

View File

@ -22,6 +22,7 @@ import logging
import logging.config import logging.config
import logging.handlers import logging.handlers
import os import os
import socket
import sys import sys
from heat import version from heat import version
@ -37,6 +38,7 @@ paste_deploy_opts = [
] ]
class HeatConfigOpts(cfg.CommonConfigOpts): class HeatConfigOpts(cfg.CommonConfigOpts):
def __init__(self, default_config_files=None, **kwargs): def __init__(self, default_config_files=None, **kwargs):
@ -46,13 +48,27 @@ class HeatConfigOpts(cfg.CommonConfigOpts):
default_config_files=default_config_files, default_config_files=default_config_files,
**kwargs) **kwargs)
class HeatEngineConfigOpts(cfg.CommonConfigOpts):
engine_opts = [
cfg.StrOpt('host',
default=socket.gethostname(),
help='Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address.'),
cfg.StrOpt('instance_driver',
default='heat.engine.nova',
help='Driver to use for controlling instances'),
]
class HeatCacheConfigOpts(HeatConfigOpts): def __init__(self, default_config_files=None, **kwargs):
super(HeatEngineConfigOpts, self).__init__(
def __init__(self, **kwargs): project='heat',
version='%%prog %s' % version.version_string(),
**kwargs)
config_files = cfg.find_config_files(project='heat', config_files = cfg.find_config_files(project='heat',
prog='heat-cache') prog='heat-engine')
super(HeatCacheConfigOpts, self).__init__(config_files, **kwargs) self.register_cli_opts(self.engine_opts)
FLAGS = HeatEngineConfigOpts()
def setup_logging(conf): def setup_logging(conf):

View File

@ -17,6 +17,7 @@
"""Heat exception subclasses""" """Heat exception subclasses"""
import functools
import urlparse import urlparse
@ -53,6 +54,81 @@ class HeatException(Exception):
def __str__(self): def __str__(self):
return self._error_string return self._error_string
def wrap_exception(notifier=None, publisher_id=None, event_type=None,
level=None):
"""This decorator wraps a method to catch any exceptions that may
get thrown. It logs the exception as well as optionally sending
it to the notification system.
"""
# TODO(sandy): Find a way to import nova.notifier.api so we don't have
# to pass it in as a parameter. Otherwise we get a cyclic import of
# nova.notifier.api -> nova.utils -> nova.exception :(
# TODO(johannes): Also, it would be nice to use
# utils.save_and_reraise_exception() without an import loop
def inner(f):
def wrapped(*args, **kw):
try:
return f(*args, **kw)
except Exception, e:
# Save exception since it can be clobbered during processing
# below before we can re-raise
exc_info = sys.exc_info()
if notifier:
payload = dict(args=args, exception=e)
payload.update(kw)
# Use a temp vars so we don't shadow
# our outer definitions.
temp_level = level
if not temp_level:
temp_level = notifier.ERROR
temp_type = event_type
if not temp_type:
# If f has multiple decorators, they must use
# functools.wraps to ensure the name is
# propagated.
temp_type = f.__name__
notifier.notify(publisher_id, temp_type, temp_level,
payload)
# re-raise original exception since it may have been clobbered
raise exc_info[0], exc_info[1], exc_info[2]
return functools.wraps(f)(wrapped)
return inner
class NovaException(Exception):
"""Base Nova Exception
To correctly use this class, inherit from it and define
a 'message' property. That message will get printf'd
with the keyword arguments provided to the constructor.
"""
message = _("An unknown exception occurred.")
def __init__(self, message=None, **kwargs):
self.kwargs = kwargs
if 'code' not in self.kwargs:
try:
self.kwargs['code'] = self.code
except AttributeError:
pass
if not message:
try:
message = self.message % kwargs
except Exception as e:
# at least get the core message out if something happened
message = self.message
super(NovaException, self).__init__(message)
class MissingArgumentError(HeatException): class MissingArgumentError(HeatException):
message = _("Missing required argument.") message = _("Missing required argument.")

View File

@ -20,6 +20,7 @@
System-level utilities and helper functions. System-level utilities and helper functions.
""" """
import datetime
import sys import sys
import uuid import uuid
@ -75,3 +76,14 @@ def import_object(import_str):
def generate_uuid(): def generate_uuid():
return str(uuid.uuid4()) return str(uuid.uuid4())
def gen_uuid():
return uuid.uuid4()
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
return utcnow.override_time
return datetime.datetime.utcnow()
utcnow.override_time = None

123
heat/context.py Normal file
View File

@ -0,0 +1,123 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""RequestContext: context for requests that persist through all of nova."""
import copy
import logging
from heat import local
from heat.common import utils
LOG = logging.getLogger(__name__)
def generate_request_id():
return 'req-' + str(utils.gen_uuid())
class RequestContext(object):
"""Security context and request information.
Represents the user taking a given action within the system.
"""
def __init__(self, user_id, project_id, is_admin=None, read_deleted="no",
roles=None, remote_address=None, timestamp=None,
request_id=None, auth_token=None, overwrite=True, **kwargs):
"""
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
indicates deleted records are visible, 'only' indicates that
*only* deleted records are visible.
:param overwrite: Set to False to ensure that the greenthread local
copy of the index is not overwritten.
:param kwargs: Extra arguments that might be present, but we ignore
because they possibly came in from older rpc messages.
"""
if read_deleted not in ('no', 'yes', 'only'):
raise ValueError(_("read_deleted can only be one of 'no', "
"'yes' or 'only', not %r") % read_deleted)
if kwargs:
LOG.warn(_('Arguments dropped when creating context: %s') %
str(kwargs))
self.user_id = user_id
self.project_id = project_id
self.roles = roles or []
self.is_admin = is_admin
if self.is_admin is None:
self.is_admin = 'admin' in [x.lower() for x in self.roles]
elif self.is_admin and 'admin' not in self.roles:
self.roles.append('admin')
self.read_deleted = read_deleted
self.remote_address = remote_address
if not timestamp:
timestamp = utils.utcnow()
if isinstance(timestamp, basestring):
timestamp = utils.parse_strtime(timestamp)
self.timestamp = timestamp
if not request_id:
request_id = generate_request_id()
self.request_id = request_id
self.auth_token = auth_token
if overwrite or not hasattr(local.store, 'context'):
self.update_store()
def update_store(self):
local.store.context = self
def to_dict(self):
return {'user_id': self.user_id,
'project_id': self.project_id,
'is_admin': self.is_admin,
'read_deleted': self.read_deleted,
'roles': self.roles,
'remote_address': self.remote_address,
'timestamp': utils.strtime(self.timestamp),
'request_id': self.request_id,
'auth_token': self.auth_token}
@classmethod
def from_dict(cls, values):
return cls(**values)
def elevated(self, read_deleted=None, overwrite=False):
"""Return a version of this context with admin flag set."""
context = copy.copy(self)
context.is_admin = True
if 'admin' not in context.roles:
context.roles.append('admin')
if read_deleted is not None:
context.read_deleted = read_deleted
return context
def get_admin_context(read_deleted="no"):
return RequestContext(user_id=None,
project_id=None,
is_admin=True,
read_deleted=read_deleted,
overwrite=False)

65
heat/engine/manager.py Normal file
View File

@ -0,0 +1,65 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Handles all processes relating to instances (guest vms).
The :py:class:`ComputeManager` class is a :py:class:`heat.manager.Manager` that
handles RPC calls relating to creating instances. It is responsible for
building a disk image, launching it via the underlying virtualization driver,
responding to calls to check its state, attaching persistent storage, and
terminating it.
**Related Flags**
:instances_path: Where instances are kept on disk
:compute_driver: Name of class that is used to handle virtualization, loaded
by :func:`heat.utils.import_object`
"""
import contextlib
import functools
import os
import socket
import sys
import tempfile
import time
import traceback
import logging
from eventlet import greenthread
import heat.context
from heat import exception
from heat import manager
from heat.openstack.common import cfg
from heat import rpc
LOG = logging.getLogger(__name__)
class EngineManager(manager.Manager):
"""Manages the running instances from creation to destruction."""
def __init__(self, *args, **kwargs):
"""Load configuration options and connect to the hypervisor."""
def create(self, template, stack_id):
pass

37
heat/local.py Normal file
View File

@ -0,0 +1,37 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Greenthread local storage of variables using weak references"""
import weakref
from eventlet import corolocal
class WeakLocal(corolocal.local):
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
if rval:
rval = rval()
return rval
def __setattr__(self, attr, value):
value = weakref.ref(value)
return corolocal.local.__setattr__(self, attr, value)
store = WeakLocal()

175
heat/manager.py Normal file
View File

@ -0,0 +1,175 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base Manager class.
Managers are responsible for a certain aspect of the system. It is a logical
grouping of code relating to a portion of the system. In general other
components should be using the manager to make changes to the components that
it is responsible for.
For example, other components that need to deal with volumes in some way,
should do so by calling methods on the VolumeManager instead of directly
changing fields in the database. This allows us to keep all of the code
relating to volumes in the same place.
We have adopted a basic strategy of Smart managers and dumb data, which means
rather than attaching methods to data objects, components should call manager
methods that act on the data.
Methods on managers that can be executed locally should be called directly. If
a particular method must execute on a remote host, this should be done via rpc
to the service that wraps the manager
Managers should be responsible for most of the db access, and
non-implementation specific data. Anything implementation specific that can't
be generalized should be done by the Driver.
In general, we prefer to have one manager with multiple drivers for different
implementations, but sometimes it makes sense to have multiple managers. You
can think of it this way: Abstract different overall strategies at the manager
level(FlatNetwork vs VlanNetwork), and different implementations at the driver
level(LinuxNetDriver vs CiscoNetDriver).
Managers will often provide methods for initial setup of a host or periodic
tasks to a wrapping service.
This module provides Manager, a base class for managers.
"""
import logging
from heat import version
from heat.common import config
FLAGS = config.FLAGS
LOG = logging.getLogger(__name__)
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on every tick
of the periodic scheduler.
2. With arguments, @periodic_task(ticks_between_runs=N), this will be
run on every N ticks of the periodic scheduler.
"""
def decorator(f):
f._periodic_task = True
f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
# and without parens.
#
# In the 'with-parens' case (with kwargs present), this function needs to
# return a decorator function since the interpreter will invoke it like:
#
# periodic_task(*args, **kwargs)(f)
#
# In the 'without-parens' case, the original function will be passed
# in as the first argument, like:
#
# periodic_task(f)
if kwargs:
return decorator
else:
return decorator(args[0])
class ManagerMeta(type):
def __init__(cls, names, bases, dict_):
"""Metaclass that allows us to collect decorated periodic tasks."""
super(ManagerMeta, cls).__init__(names, bases, dict_)
# NOTE(sirp): if the attribute is not present then we must be the base
# class, so, go ahead an initialize it. If the attribute is present,
# then we're a subclass so make a copy of it so we don't step on our
# parent's toes.
try:
cls._periodic_tasks = cls._periodic_tasks[:]
except AttributeError:
cls._periodic_tasks = []
try:
cls._ticks_to_skip = cls._ticks_to_skip.copy()
except AttributeError:
cls._ticks_to_skip = {}
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
task = value
name = task.__name__
cls._periodic_tasks.append((name, task))
cls._ticks_to_skip[name] = task._ticks_between_runs
class Manager(object):
__metaclass__ = ManagerMeta
def __init__(self, host=None, db_driver=None):
if not host:
host = FLAGS.host
self.host = host
super(Manager, self).__init__(db_driver)
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
ticks_to_skip = self._ticks_to_skip[task_name]
if ticks_to_skip > 0:
LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
" ticks left until next run"), locals())
self._ticks_to_skip[task_name] -= 1
continue
self._ticks_to_skip[task_name] = task._ticks_between_runs
LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
locals())
def init_host(self):
"""Handle initialization if this is a standalone service.
Child classes should override this method.
"""
pass
def service_version(self, context):
return version.version_string()
def service_config(self, context):
config = {}
for key in FLAGS:
config[key] = FLAGS.get(key, None)
return config

202
heat/rpc/__init__.py Normal file
View File

@ -0,0 +1,202 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat.openstack.common import cfg
from heat.common import utils
from heat.common import config
rpc_backend_opt = cfg.StrOpt('rpc_backend',
default='heat.rpc.impl_qpid',
help="The messaging module to use, defaults to kombu.")
FLAGS = config.FLAGS
FLAGS.register_opt(rpc_backend_opt)
def create_connection(new=True):
"""Create a connection to the message bus used for rpc.
For some example usage of creating a connection and some consumers on that
connection, see nova.service.
:param new: Whether or not to create a new connection. A new connection
will be created by default. If new is False, the
implementation is free to return an existing connection from a
pool.
:returns: An instance of nova.rpc.common.Connection
"""
return _get_impl().create_connection(new=new)
def call(context, topic, msg, timeout=None):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
nova.rpc.common.Connection.create_consumer() and only applies
when the consumer was created with fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:returns: A dict from the remote method.
:raises: nova.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
"""
return _get_impl().call(context, topic, msg, timeout)
def cast(context, topic, msg):
"""Invoke a remote method that does not return anything.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
nova.rpc.common.Connection.create_consumer() and only applies
when the consumer was created with fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().cast(context, topic, msg)
def fanout_cast(context, topic, msg):
"""Broadcast a remote method invocation with no return.
This method will get invoked on all consumers that were set up with this
topic name and fanout=True.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
nova.rpc.common.Connection.create_consumer() and only applies
when the consumer was created with fanout=True.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().fanout_cast(context, topic, msg)
def multicall(context, topic, msg, timeout=None):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
separate messages, so the return values can be processed as the come in via
an iterator.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
nova.rpc.common.Connection.create_consumer() and only applies
when the consumer was created with fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
returned and X is the Nth value that was returned by the remote
method.
:raises: nova.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
"""
return _get_impl().multicall(context, topic, msg, timeout)
def notify(context, topic, msg):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
:returns: None
"""
return _get_impl().notify(context, topic, msg)
def cleanup():
"""Clean up resoruces in use by implementation.
Clean up any resources that have been allocated by the RPC implementation.
This is typically open connections to a messaging service. This function
would get called before an application using this API exits to allow
connections to get torn down cleanly.
:returns: None
"""
return _get_impl().cleanup()
def cast_to_server(context, server_params, topic, msg):
"""Invoke a remote method that does not return anything.
:param context: Information that identifies the user that has made this
request.
:param server_params: Connection information
:param topic: The topic to send the notification to.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().cast_to_server(context, server_params, topic, msg)
def fanout_cast_to_server(context, server_params, topic, msg):
"""Broadcast to a remote method invocation with no return.
:param context: Information that identifies the user that has made this
request.
:param server_params: Connection information
:param topic: The topic to send the notification to.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().fanout_cast_to_server(context, server_params, topic,
msg)
_RPCIMPL = None
def _get_impl():
"""Delay import of rpc_backend until FLAGS are loaded."""
global _RPCIMPL
if _RPCIMPL is None:
_RPCIMPL = utils.import_object(FLAGS.rpc_backend)
return _RPCIMPL

384
heat/rpc/amqp.py Normal file
View File

@ -0,0 +1,384 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2011 - 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Shared code between AMQP based nova.rpc implementations.
The code in this module is shared between the rpc implemenations based on AMQP.
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
AMQP, but is deprecated and predates this code.
"""
import inspect
import logging
import sys
import traceback
import uuid
from eventlet import greenpool
from eventlet import pools
from heat import context
from heat.common import exception
from heat.common import config
from heat import local
import heat.rpc.common as rpc_common
LOG = logging.getLogger(__name__)
FLAGS = config.FLAGS
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, *args, **kwargs):
self.connection_cls = kwargs.pop("connection_cls", None)
kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Pool creating new connection')
return self.connection_cls()
def empty(self):
while self.free_items:
self.get().close()
class ConnectionContext(rpc_common.Connection):
"""The class that is actually returned to the caller of
create_connection(). This is a essentially a wrapper around
Connection that supports 'with' and can return a new Connection or
one from a pool. It will also catch when an instance of this class
is to be deleted so that we can return Connections to the pool on
exceptions and so forth without making the caller be responsible for
catching all exceptions and making sure to return a connection to
the pool.
"""
def __init__(self, connection_pool, pooled=True, server_params=None):
"""Create a new connection, or get one from the pool"""
self.connection = None
self.connection_pool = connection_pool
if pooled:
self.connection = connection_pool.get()
else:
self.connection = connection_pool.connection_cls(
server_params=server_params)
self.pooled = pooled
def __enter__(self):
"""When with ConnectionContext() is used, return self"""
return self
def _done(self):
"""If the connection came from a pool, clean it up and put it back.
If it did not come from a pool, close it.
"""
if self.connection:
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
self.connection.reset()
self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
except Exception:
pass
self.connection = None
def __exit__(self, exc_type, exc_value, tb):
"""End of 'with' statement. We're done here."""
self._done()
def __del__(self):
"""Caller is done with this connection. Make sure we cleaned up."""
self._done()
def close(self):
"""Caller is done with this connection."""
self._done()
def create_consumer(self, topic, proxy, fanout=False):
self.connection.create_consumer(topic, proxy, fanout)
def consume_in_thread(self):
self.connection.consume_in_thread()
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance"""
if self.connection:
return getattr(self.connection, key)
else:
raise exception.InvalidRPCConnectionReuse()
def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
"""
with ConnectionContext(connection_pool) as conn:
if failure:
message = str(failure[1])
tb = traceback.format_exception(*failure)
LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
try:
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
class RpcContext(context.RequestContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, *args, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
super(RpcContext, self).__init__(*args, **kwargs)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None):
if self.msg_id:
msg_reply(self.msg_id, connection_pool, reply, failure,
ending)
if ending:
self.msg_id = None
def unpack_context(msg):
"""Unpack context from msg."""
context_dict = {}
for key in list(msg.keys()):
# NOTE(vish): Some versions of python don't like unicode keys
# in kwargs.
key = str(key)
if key.startswith('_context_'):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
ctx = RpcContext.from_dict(context_dict)
LOG.debug(_('unpacked context: %s'), ctx.to_dict())
return ctx
def pack_context(msg, context):
"""Pack context into msg.
Values for message keys need to be less than 255 chars, so we pull
context out into a bunch of separate keys. If we want to support
more arguments in rabbit messages, we may want to do the same
for args at some point.
"""
context_d = dict([('_context_%s' % key, value)
for (key, value) in context.to_dict().iteritems()])
msg.update(context_d)
class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args."""
def __init__(self, proxy, connection_pool):
self.proxy = proxy
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
self.connection_pool = connection_pool
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
Parses the message for validity and fires off a thread to call the
proxy object method.
Message data should be a dictionary with two keys:
method: string representing the method to call
args: dictionary of arg: value
Example: {'method': 'echo', 'args': {'value': 42}}
"""
# It is important to clear the context here, because at this point
# the previous context is stored in local.store.context
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
ctxt = unpack_context(message_data)
method = message_data.get('method')
args = message_data.get('args', {})
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
self.pool.spawn_n(self._process_data, ctxt, method, args)
@exception.wrap_exception()
def _process_data(self, ctxt, method, args):
"""Thread that magically looks for a method on the proxy
object and calls it.
"""
ctxt.update_store()
try:
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
rval = node_func(context=ctxt, **node_args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
ctxt.reply(x, None, connection_pool=self.connection_pool)
else:
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
except Exception as e:
LOG.exception('Exception during message handling')
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
return
class MulticallWaiter(object):
def __init__(self, connection, timeout):
self._connection = connection
self._iterator = connection.iterconsume(
timeout=timeout or FLAGS.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
def done(self):
if self._done:
return
self._done = True
self._iterator.close()
self._iterator = None
self._connection.close()
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
if data['failure']:
self._result = rpc_common.RemoteError(*data['failure'])
elif data.get('ending', False):
self._got_ending = True
else:
self._result = data['result']
def __iter__(self):
"""Return a result until we get a 'None' response from consumer"""
if self._done:
raise StopIteration
while True:
self._iterator.next()
if self._got_ending:
self.done()
raise StopIteration
result = self._result
if isinstance(result, Exception):
self.done()
raise result
yield result
def create_connection(new, connection_pool):
"""Create a connection"""
return ConnectionContext(connection_pool, pooled=not new)
def multicall(context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
pack_context(msg, context)
conn = ConnectionContext(connection_pool)
wait_msg = MulticallWaiter(conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg
def call(context, topic, msg, timeout, connection_pool):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg, timeout, connection_pool)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
return
return rv[-1]
def cast(context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
with ConnectionContext(connection_pool) as conn:
conn.topic_send(topic, msg)
def fanout_cast(context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
with ConnectionContext(connection_pool) as conn:
conn.fanout_send(topic, msg)
def cast_to_server(context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
pack_context(msg, context)
with ConnectionContext(connection_pool, pooled=False,
server_params=server_params) as conn:
conn.topic_send(topic, msg)
def fanout_cast_to_server(context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
pack_context(msg, context)
with ConnectionContext(connection_pool, pooled=False,
server_params=server_params) as conn:
conn.fanout_send(topic, msg)
def notify(context, topic, msg, connection_pool):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending notification on %s...'), topic)
pack_context(msg, context)
with ConnectionContext(connection_pool) as conn:
conn.notify_send(topic, msg)
def cleanup(connection_pool):
connection_pool.empty()

144
heat/rpc/common.py Normal file
View File

@ -0,0 +1,144 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import logging
from heat import exception
from heat.openstack.common import cfg
from heat.common import config
LOG = logging.getLogger(__name__)
rpc_opts = [
cfg.IntOpt('rpc_thread_pool_size',
default=1024,
help='Size of RPC thread pool'),
cfg.IntOpt('rpc_conn_pool_size',
default=30,
help='Size of RPC connection pool'),
cfg.IntOpt('rpc_response_timeout',
default=60,
help='Seconds to wait for a response from call or multicall'),
]
config.FLAGS.register_opts(rpc_opts)
class RemoteError(exception.NovaException):
"""Signifies that a remote class has raised an exception.
Contains a string representation of the type of the original exception,
the value of the original exception, and the traceback. These are
sent to the parent as a joined string so printing the exception
contains all of the relevant info.
"""
message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type
self.value = value
self.traceback = traceback
super(RemoteError, self).__init__(exc_type=exc_type,
value=value,
traceback=traceback)
class Timeout(exception.NovaException):
"""Signifies that a timeout has occurred.
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
message = _("Timeout while waiting on RPC response.")
class Connection(object):
"""A connection, returned by rpc.create_connection().
This class represents a connection to the message bus used for rpc.
An instance of this class should never be created by users of the rpc API.
Use rpc.create_connection() instead.
"""
def close(self):
"""Close the connection.
This method must be called when the connection will no longer be used.
It will ensure that any resources associated with the connection, such
as a network connection, and cleaned up.
"""
raise NotImplementedError()
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer on this connection.
A consumer is associated with a message queue on the backend message
bus. The consumer will read messages from the queue, unpack them, and
dispatch them to the proxy object. The contents of the message pulled
off of the queue will determine which method gets called on the proxy
object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic. For example, all instances of nova-compute consume
from a queue called "compute". In that case, the
messages will get distributed amongst the consumers in a
round-robin fashion if fanout=False. If fanout=True,
every consumer associated with this topic will get a
copy of every message.
:param proxy: The object that will handle all incoming messages.
:param fanout: Whether or not this is a fanout topic. See the
documentation for the topic parameter for some
additional comments on this.
"""
raise NotImplementedError()
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
Spawn a thread that will be responsible for handling all incoming
messages for consumers that were set up on this connection.
Message dispatching inside of this is expected to be implemented in a
non-blocking manner. An example implementation would be having this
thread pull messages in for all of the consumers, but utilize a thread
pool for dispatching the messages to the proxy objects.
"""
raise NotImplementedError()
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
SANITIZE = {
'set_admin_password': ('new_pass',),
'run_instance': ('admin_password',),
}
method = msg_data['method']
if method in SANITIZE:
msg_data = copy.deepcopy(msg_data)
args_to_sanitize = SANITIZE[method]
for arg in args_to_sanitize:
try:
msg_data['args'][arg] = "<SANITIZED>"
except KeyError:
pass
return log_func(msg, msg_data)

188
heat/rpc/impl_fake.py Normal file
View File

@ -0,0 +1,188 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Fake RPC implementation which calls proxy methods directly with no
queues. Casts will block, but this is very useful for tests.
"""
import inspect
import json
import signal
import sys
import time
import traceback
import eventlet
from heat import context
from heat.common import config
from heat.rpc import common as rpc_common
CONSUMERS = {}
FLAGS = config.FLAGS
class RpcContext(context.RequestContext):
def __init__(self, *args, **kwargs):
super(RpcContext, self).__init__(*args, **kwargs)
self._response = []
self._done = False
def reply(self, reply=None, failure=None, ending=False):
if ending:
self._done = True
if not self._done:
self._response.append((reply, failure))
class Consumer(object):
def __init__(self, topic, proxy):
self.topic = topic
self.proxy = proxy
def call(self, context, method, args, timeout):
node_func = getattr(self.proxy, method)
node_args = dict((str(k), v) for k, v in args.iteritems())
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
rval = node_func(context=ctxt, **node_args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
if failure:
raise failure[0], failure[1], failure[2]
res.append(reply)
# if ending not 'sent'...we might have more data to
# return from the function itself
if not ctxt._done:
if inspect.isgenerator(rval):
for val in rval:
res.append(val)
else:
res.append(rval)
done.send(res)
except Exception:
exc_info = sys.exc_info()
done.send_exception(
rpc_common.RemoteError(exc_info[0].__name__,
str(exc_info[1]),
''.join(traceback.format_exception(*exc_info))))
thread = eventlet.greenthread.spawn(_inner)
if timeout:
start_time = time.time()
while not done.ready():
eventlet.greenthread.sleep(1)
cur_time = time.time()
if (cur_time - start_time) > timeout:
thread.kill()
raise rpc_common.Timeout()
return done.wait()
class Connection(object):
"""Connection object."""
def __init__(self):
self.consumers = []
def create_consumer(self, topic, proxy, fanout=False):
consumer = Consumer(topic, proxy)
self.consumers.append(consumer)
if topic not in CONSUMERS:
CONSUMERS[topic] = []
CONSUMERS[topic].append(consumer)
def close(self):
for consumer in self.consumers:
CONSUMERS[consumer.topic].remove(consumer)
self.consumers = []
def consume_in_thread(self):
pass
def create_connection(new=True):
"""Create a connection"""
return Connection()
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
json.dumps(msg)
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
check_serialize(msg)
method = msg.get('method')
if not method:
return
args = msg.get('args', {})
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
return consumer.call(context, method, args, timeout)
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
return
return rv[-1]
def cast(context, topic, msg):
try:
call(context, topic, msg)
except rpc_common.RemoteError:
pass
def notify(context, topic, msg):
check_serialize(msg)
def cleanup():
pass
def fanout_cast(context, topic, msg):
"""Cast to all consumers of a topic"""
check_serialize(msg)
method = msg.get('method')
if not method:
return
args = msg.get('args', {})
for consumer in CONSUMERS.get(topic, []):
try:
consumer.call(context, method, args, None)
except rpc_common.RemoteError:
pass

705
heat/rpc/impl_kombu.py Normal file
View File

@ -0,0 +1,705 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import socket
import ssl
import sys
import time
import uuid
import eventlet
import greenlet
import kombu
import kombu.entity
import kombu.messaging
import kombu.connection
from heat.common import config
from heat.openstack.common import cfg
from heat.rpc import amqp as rpc_amqp
from heat.rpc import common as rpc_common
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
help='SSL version to use (valid only if SSL enabled)'),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
help='SSL key file (valid only if SSL enabled)'),
cfg.StrOpt('kombu_ssl_certfile',
default='',
help='SSL cert file (valid only if SSL enabled)'),
cfg.StrOpt('kombu_ssl_ca_certs',
default='',
help=('SSL certification authority file '
'(valid only if SSL enabled)')),
]
FLAGS = config.FLAGS
FLAGS.register_opts(kombu_opts)
LOG = rpc_common.LOG
class ConsumerBase(object):
"""Consumer base class."""
def __init__(self, channel, callback, tag, **kwargs):
"""Declare a queue on an amqp channel.
'channel' is the amqp channel to use
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
queue name, exchange name, and other kombu options are
passed in here as a dictionary.
"""
self.callback = callback
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
self.reconnect(channel)
def reconnect(self, channel):
"""Re-declare the queue after a rabbit reconnect"""
self.channel = channel
self.kwargs['channel'] = channel
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare()
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
Connection.iterconsume() iterator will process the messages,
calling the appropriate callback.
If a callback is specified in kwargs, use that. Otherwise,
use the callback passed during __init__()
If kwargs['nowait'] is True, then this call will block until
a message is read.
Messages will automatically be acked if the callback doesn't
raise an exception
"""
options = {'consumer_tag': self.tag}
options['nowait'] = kwargs.get('nowait', False)
callback = kwargs.get('callback', self.callback)
if not callback:
raise ValueError("No callback defined")
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
callback(message.payload)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
self.queue.consume(*args, callback=_callback, **options)
def cancel(self):
"""Cancel the consuming from the queue, if it has started"""
try:
self.queue.cancel(self.tag)
except KeyError, e:
# NOTE(comstud): Kludge to get around a amqplib bug
if str(e) != "u'%s'" % self.tag:
raise
self.queue = None
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'"""
def __init__(self, channel, msg_id, callback, tag, **kwargs):
"""Init a 'direct' queue.
'channel' is the amqp channel to use
'msg_id' is the msg_id to listen on
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
Other kombu options may be passed
"""
# Default options
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=msg_id,
type='direct',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(DirectConsumer, self).__init__(
channel,
callback,
tag,
name=msg_id,
exchange=exchange,
routing_key=msg_id,
**options)
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, channel, topic, callback, tag, **kwargs):
"""Init a 'topic' queue.
'channel' is the amqp channel to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
Other kombu options may be passed
"""
# Default options
options = {'durable': FLAGS.rabbit_durable_queues,
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=FLAGS.control_exchange,
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(
channel,
callback,
tag,
name=topic,
exchange=exchange,
routing_key=topic,
**options)
class FanoutConsumer(ConsumerBase):
"""Consumer class for 'fanout'"""
def __init__(self, channel, topic, callback, tag, **kwargs):
"""Init a 'fanout' queue.
'channel' is the amqp channel to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
Other kombu options may be passed
"""
unique = uuid.uuid4().hex
exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique)
# Default options
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=exchange_name,
type='fanout',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(FanoutConsumer, self).__init__(
channel,
callback,
tag,
name=queue_name,
exchange=exchange,
routing_key=topic,
**options)
class Publisher(object):
"""Base Publisher class"""
def __init__(self, channel, exchange_name, routing_key, **kwargs):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
"""
self.exchange_name = exchange_name
self.routing_key = routing_key
self.kwargs = kwargs
self.reconnect(channel)
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection"""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel, routing_key=self.routing_key)
def send(self, msg):
"""Send a message"""
self.producer.publish(msg)
class DirectPublisher(Publisher):
"""Publisher class for 'direct'"""
def __init__(self, channel, msg_id, **kwargs):
"""init a 'direct' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel,
msg_id,
msg_id,
type='direct',
**options)
class TopicPublisher(Publisher):
"""Publisher class for 'topic'"""
def __init__(self, channel, topic, **kwargs):
"""init a 'topic' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': FLAGS.rabbit_durable_queues,
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
super(TopicPublisher, self).__init__(channel,
FLAGS.control_exchange,
topic,
type='topic',
**options)
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'"""
def __init__(self, channel, topic, **kwargs):
"""init a 'fanout' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel,
'%s_fanout' % topic,
None,
type='fanout',
**options)
class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'"""
def __init__(self, *args, **kwargs):
self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues)
super(NotifyPublisher, self).__init__(*args, **kwargs)
def reconnect(self, channel):
super(NotifyPublisher, self).reconnect(channel)
# NOTE(jerdfelt): Normally the consumer would create the queue, but
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(channel=channel,
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
routing_key=self.routing_key)
queue.declare()
class Connection(object):
"""Connection object."""
def __init__(self, server_params=None):
self.consumers = []
self.consumer_thread = None
self.max_retries = FLAGS.rabbit_max_retries
# Try forever?
if self.max_retries <= 0:
self.max_retries = None
self.interval_start = FLAGS.rabbit_retry_interval
self.interval_stepping = FLAGS.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30
self.memory_transport = False
if server_params is None:
server_params = {}
# Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'}
params = {}
for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
params.setdefault('hostname', FLAGS.rabbit_host)
params.setdefault('port', FLAGS.rabbit_port)
params.setdefault('userid', FLAGS.rabbit_userid)
params.setdefault('password', FLAGS.rabbit_password)
params.setdefault('virtual_host', FLAGS.rabbit_virtual_host)
self.params = params
if FLAGS.fake_rabbit:
self.params['transport'] = 'memory'
self.memory_transport = True
else:
self.memory_transport = False
if FLAGS.rabbit_use_ssl:
self.params['ssl'] = self._fetch_ssl_params()
self.connection = None
self.reconnect()
def _fetch_ssl_params(self):
"""Handles fetching what ssl params
should be used for the connection (if any)"""
ssl_params = dict()
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
if FLAGS.kombu_ssl_version:
ssl_params['ssl_version'] = FLAGS.kombu_ssl_version
if FLAGS.kombu_ssl_keyfile:
ssl_params['keyfile'] = FLAGS.kombu_ssl_keyfile
if FLAGS.kombu_ssl_certfile:
ssl_params['certfile'] = FLAGS.kombu_ssl_certfile
if FLAGS.kombu_ssl_ca_certs:
ssl_params['ca_certs'] = FLAGS.kombu_ssl_ca_certs
# We might want to allow variations in the
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
if not ssl_params:
# Just have the default behavior
return True
else:
# Return the extended behavior
return ssl_params
def _connect(self):
"""Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % self.params)
try:
self.connection.close()
except self.connection_errors:
pass
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
self.connection = kombu.connection.BrokerConnection(
**self.params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
self.consumer_num = itertools.count(1)
self.connection.connect()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
LOG.info(_('Connected to AMQP server on '
'%(hostname)s:%(port)d') % self.params)
def reconnect(self):
"""Handles reconnecting and re-establishing queues.
Will retry up to self.max_retries number of times.
self.max_retries = 0 means to retry forever.
Sleep between tries, starting at self.interval_start
seconds, backing off self.interval_stepping number of seconds
each attempt.
"""
attempt = 0
while True:
attempt += 1
try:
self._connect()
return
except self.connection_errors, e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621)
# So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
log_info = {}
log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries
log_info.update(self.params)
if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
sys.exit(1)
if attempt == 1:
sleep_time = self.interval_start or 1
elif attempt > 1:
sleep_time += self.interval_stepping
if self.interval_max:
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
' unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
except (self.connection_errors, socket.timeout), e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621)
# So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
if error_callback:
error_callback(e)
self.reconnect()
def get_channel(self):
"""Convenience call for bin/clear_rabbit_queues"""
return self.channel
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
self.consumers = []
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
add it to our list of consumers
"""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.channel, topic, callback,
self.consumer_num.next())
self.consumers.append(consumer)
return consumer
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""
info = {'do_consume': True}
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
info['do_consume'] = True
def _consume():
if info['do_consume']:
queues_head = self.consumers[:-1]
queues_tail = self.consumers[-1]
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
info['do_consume'] = False
return self.connection.drain_events(timeout=timeout)
for iteration in itertools.count(0):
if limit and iteration >= limit:
raise StopIteration
yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
self.consumer_thread.wait()
except greenlet.GreenletExit:
pass
self.consumer_thread = None
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s") % log_info)
def _publish():
publisher = cls(self.channel, topic, **kwargs)
publisher.send(msg)
self.ensure(_error_callback, _publish)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
In nova's use, this is generally a msg_id queue used for
responses for call/multicall
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None):
"""Create a 'topic' consumer."""
self.declare_consumer(TopicConsumer, topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg):
"""Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
self.publisher_send(FanoutPublisher, topic, msg)
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
def consume(self, limit=None):
"""Consume from all queues/consumers"""
it = self.iterconsume(limit=limit)
while True:
try:
it.next()
except StopIteration:
return
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
self.declare_fanout_consumer(topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
self.declare_topic_consumer(topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool))
Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
def create_connection(new=True):
"""Create a connection"""
return rpc_amqp.create_connection(new, Connection.pool)
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
return rpc_amqp.cast(context, topic, msg, Connection.pool)
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def cast_to_server(context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
Connection.pool)
def fanout_cast_to_server(context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
Connection.pool)
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg, Connection.pool)
def cleanup():
return rpc_amqp.cleanup(Connection.pool)

552
heat/rpc/impl_qpid.py Normal file
View File

@ -0,0 +1,552 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 - 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import time
import uuid
import json
import logging
import eventlet
import greenlet
import qpid.messaging
import qpid.messaging.exceptions
from heat.common import config
from heat.openstack.common import cfg
from heat.rpc import amqp as rpc_amqp
from heat.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
qpid_opts = [
cfg.StrOpt('qpid_hostname',
default='localhost',
help='Qpid broker hostname'),
cfg.StrOpt('qpid_port',
default='5672',
help='Qpid broker port'),
cfg.StrOpt('qpid_username',
default='',
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
help='Password for qpid connection'),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
cfg.BoolOpt('qpid_reconnect',
default=True,
help='Automatically reconnect'),
cfg.IntOpt('qpid_reconnect_timeout',
default=0,
help='Reconnection timeout in seconds'),
cfg.IntOpt('qpid_reconnect_limit',
default=0,
help='Max reconnections before giving up'),
cfg.IntOpt('qpid_reconnect_interval_min',
default=0,
help='Minimum seconds between reconnection attempts'),
cfg.IntOpt('qpid_reconnect_interval_max',
default=0,
help='Maximum seconds between reconnection attempts'),
cfg.IntOpt('qpid_reconnect_interval',
default=0,
help='Equivalent to setting max and min to the same value'),
cfg.IntOpt('qpid_heartbeat',
default=5,
help='Seconds between connection keepalive heartbeats'),
cfg.StrOpt('qpid_protocol',
default='tcp',
help="Transport to use, either 'tcp' or 'ssl'"),
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
]
FLAGS = config.FLAGS
FLAGS.register_opts(qpid_opts)
class ConsumerBase(object):
"""Consumer base class."""
def __init__(self, session, callback, node_name, node_opts,
link_name, link_opts):
"""Declare a queue on an amqp session.
'session' is the amqp session to use
'callback' is the callback to call when messages are received
'node_name' is the first part of the Qpid address string, before ';'
'node_opts' will be applied to the "x-declare" section of "node"
in the address string.
'link_name' goes into the "name" field of the "link" in the address
string
'link_opts' will be applied to the "x-declare" section of "link"
in the address string.
"""
self.callback = callback
self.receiver = None
self.session = None
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": True,
"auto-delete": True,
},
},
"link": {
"name": link_name,
"durable": True,
"x-declare": {
"durable": False,
"auto-delete": True,
"exclusive": False,
},
},
}
addr_opts["node"]["x-declare"].update(node_opts)
addr_opts["link"]["x-declare"].update(link_opts)
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
self.reconnect(session)
def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect"""
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
def consume(self):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
self.callback(message.content)
def get_receiver(self):
return self.receiver
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'"""
def __init__(self, session, msg_id, callback):
"""Init a 'direct' queue.
'session' is the amqp session to use
'msg_id' is the msg_id to listen on
'callback' is the callback to call when messages are received
"""
super(DirectConsumer, self).__init__(session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{"exclusive": True})
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, session, topic, callback):
"""Init a 'topic' queue.
'session' is the amqp session to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (FLAGS.control_exchange, topic), {},
topic, {})
class FanoutConsumer(ConsumerBase):
"""Consumer class for 'fanout'"""
def __init__(self, session, topic, callback):
"""Init a 'fanout' queue.
'session' is the amqp session to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
super(FanoutConsumer, self).__init__(session, callback,
"%s_fanout" % topic,
{"durable": False, "type": "fanout"},
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
class Publisher(object):
"""Base Publisher class"""
def __init__(self, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
"""
self.sender = None
self.session = session
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": False,
# auto-delete isn't implemented for exchanges in qpid,
# but put in here anyway
"auto-delete": True,
},
},
}
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
self.reconnect(session)
def reconnect(self, session):
"""Re-establish the Sender after a reconnection"""
self.sender = session.sender(self.address)
def send(self, msg):
"""Send a message"""
self.sender.send(msg)
class DirectPublisher(Publisher):
"""Publisher class for 'direct'"""
def __init__(self, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "Direct"})
class TopicPublisher(Publisher):
"""Publisher class for 'topic'"""
def __init__(self, session, topic):
"""init a 'topic' publisher.
"""
super(TopicPublisher, self).__init__(session,
"%s/%s" % (FLAGS.control_exchange, topic))
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'"""
def __init__(self, session, topic):
"""init a 'fanout' publisher.
"""
super(FanoutPublisher, self).__init__(session,
"%s_fanout" % topic, {"type": "fanout"})
class NotifyPublisher(Publisher):
"""Publisher class for notifications"""
def __init__(self, session, topic):
"""init a 'topic' publisher.
"""
super(NotifyPublisher, self).__init__(session,
"%s/%s" % (FLAGS.control_exchange, topic),
{"durable": True})
class Connection(object):
"""Connection object."""
def __init__(self, server_params=None):
self.session = None
self.consumers = {}
self.consumer_thread = None
if server_params is None:
server_params = {}
default_params = dict(hostname=FLAGS.qpid_hostname,
port=FLAGS.qpid_port,
username=FLAGS.qpid_username,
password=FLAGS.qpid_password)
params = server_params
for key in default_params.keys():
params.setdefault(key, default_params[key])
self.broker = params['hostname'] + ":" + str(params['port'])
# Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker)
# Check if flags are set and if so set them for the connection
# before we call open
self.connection.username = params['username']
self.connection.password = params['password']
self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms
self.connection.reconnect = FLAGS.qpid_reconnect
if FLAGS.qpid_reconnect_timeout:
self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout
if FLAGS.qpid_reconnect_limit:
self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit
if FLAGS.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
FLAGS.qpid_reconnect_interval_max)
if FLAGS.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
FLAGS.qpid_reconnect_interval_min)
if FLAGS.qpid_reconnect_interval:
self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval
self.connection.hearbeat = FLAGS.qpid_heartbeat
self.connection.protocol = FLAGS.qpid_protocol
self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay
# Open is part of reconnect -
# NOTE(WGH) not sure we need this with the reconnect flags
self.reconnect()
def _register_consumer(self, consumer):
self.consumers[str(consumer.get_receiver())] = consumer
def _lookup_consumer(self, receiver):
return self.consumers[str(receiver)]
def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues"""
if self.connection.opened():
try:
self.connection.close()
except qpid.messaging.exceptions.ConnectionError:
pass
while True:
try:
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
LOG.error(_('Unable to connect to AMQP server: %s ') % e)
time.sleep(FLAGS.qpid_reconnect_interval or 1)
else:
break
LOG.info(_('Connected to AMQP server on %s') % self.broker)
self.session = self.connection.session()
for consumer in self.consumers.itervalues():
consumer.reconnect(self.session)
if self.consumers:
LOG.debug(_("Re-established AMQP queues"))
def ensure(self, error_callback, method, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
except (qpid.messaging.exceptions.Empty,
qpid.messaging.exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
self.connection.close()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
self.session.close()
self.session = self.connection.session()
self.consumers = {}
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
add it to our list of consumers
"""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.session, topic, callback)
self._register_consumer(consumer)
return consumer
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
def _consume():
nxt_receiver = self.session.next_receiver(timeout=timeout)
try:
self._lookup_consumer(nxt_receiver).consume()
except Exception:
LOG.exception(_("Error processing message. Skipping it."))
for iteration in itertools.count(0):
if limit and iteration >= limit:
raise StopIteration
yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
self.consumer_thread.wait()
except greenlet.GreenletExit:
pass
self.consumer_thread = None
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s") % log_info)
def _publisher_send():
publisher = cls(self.session, topic)
publisher.send(msg)
return self.ensure(_connect_error, _publisher_send)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
In nova's use, this is generally a msg_id queue used for
responses for call/multicall
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None):
"""Create a 'topic' consumer."""
self.declare_consumer(TopicConsumer, topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg):
"""Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
self.publisher_send(FanoutPublisher, topic, msg)
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg)
def consume(self, limit=None):
"""Consume from all queues/consumers"""
it = self.iterconsume(limit=limit)
while True:
try:
it.next()
except StopIteration:
return
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
consumer = FanoutConsumer(self.session, topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
consumer = TopicConsumer(self.session, topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool))
self._register_consumer(consumer)
return consumer
Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
def create_connection(new=True):
"""Create a connection"""
return rpc_amqp.create_connection(new, Connection.pool)
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
return rpc_amqp.cast(context, topic, msg, Connection.pool)
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def cast_to_server(context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
Connection.pool)
def fanout_cast_to_server(context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.fanout_cast_to_server(context, server_params, topic,
msg, Connection.pool)
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg, Connection.pool)
def cleanup():
return rpc_amqp.cleanup(Connection.pool)

275
heat/service.py Normal file
View File

@ -0,0 +1,275 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Generic Node base class for all workers that run on hosts."""
import inspect
import os
import eventlet
import logging
import greenlet
from heat.openstack.common import cfg
from heat.common import utils
from heat.common import config
from heat import context
from heat import exception
from heat import rpc
from heat import version
LOG = logging.getLogger(__name__)
service_opts = [
cfg.IntOpt('report_interval',
default=10,
help='seconds between nodes reporting state to datastore'),
cfg.IntOpt('periodic_interval',
default=60,
help='seconds between running periodic tasks'),
cfg.StrOpt('ec2_listen',
default="0.0.0.0",
help='IP address for EC2 API to listen'),
cfg.IntOpt('ec2_listen_port',
default=8773,
help='port for ec2 api to listen'),
cfg.StrOpt('osapi_compute_listen',
default="0.0.0.0",
help='IP address for OpenStack API to listen'),
cfg.IntOpt('osapi_compute_listen_port',
default=8774,
help='list port for osapi compute'),
cfg.StrOpt('metadata_manager',
default='nova.api.manager.MetadataManager',
help='OpenStack metadata service manager'),
cfg.StrOpt('metadata_listen',
default="0.0.0.0",
help='IP address for metadata api to listen'),
cfg.IntOpt('metadata_listen_port',
default=8775,
help='port for metadata api to listen'),
cfg.StrOpt('osapi_volume_listen',
default="0.0.0.0",
help='IP address for OpenStack Volume API to listen'),
cfg.IntOpt('osapi_volume_listen_port',
default=8776,
help='port for os volume api to listen'),
]
FLAGS = config.FLAGS
FLAGS.register_opts(service_opts)
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
def __init__(self):
"""Initialize the service launcher.
:returns: None
"""
self._services = []
@staticmethod
def run_server(server):
"""Start and wait for a server to finish.
:param service: Server to run and wait for.
:returns: None
"""
server.start()
server.wait()
def launch_server(self, server):
"""Load and start the given server.
:param server: The server you would like to start.
:returns: None
"""
gt = eventlet.spawn(self.run_server, server)
self._services.append(gt)
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
for service in self._services:
service.kill()
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
for service in self._services:
try:
service.wait()
except greenlet.GreenletExit:
pass
class Service(object):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager and reports
it state to the database services table."""
def __init__(self, host, binary, topic, manager,
periodic_interval=None, *args, **kwargs):
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
manager_class = utils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *args, **kwargs)
self.periodic_interval = periodic_interval
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
def start(self):
vcs_string = version.version_string_with_vcs()
LOG.info(_('Starting %(topic)s node (version %(vcs_string)s)'),
{'topic': self.topic, 'vcs_string': vcs_string})
# TODO do we need this ? -> utils.cleanup_file_locks()
self.manager.init_host()
self.model_disconnected = False
ctxt = context.get_admin_context()
# self._create_service_ref(ctxt)
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, self, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, self, fanout=False)
self.conn.create_consumer(self.topic, self, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
if self.periodic_interval:
periodic = utils.LoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval, now=False)
self.timers.append(periodic)
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
periodic_interval=None):
"""Instantiates class and passes back application object.
:param host: defaults to FLAGS.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'heat-' part
:param manager: defaults to FLAGS.<topic>_manager
:param periodic_interval: defaults to FLAGS.periodic_interval
"""
if not host:
host = FLAGS.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary.rpartition('heat-')[2]
if not manager:
manager = FLAGS.get('%s_manager' % topic, None)
if not periodic_interval:
periodic_interval = FLAGS.periodic_interval
service_obj = cls(host, binary, topic, manager,
periodic_interval)
return service_obj
def kill(self):
self.stop()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
for x in self.timers:
try:
x.stop()
except Exception:
pass
self.timers = []
def wait(self):
for x in self.timers:
try:
x.wait()
except Exception:
pass
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
# NOTE(vish): the global launcher is to maintain the existing
# functionality of calling service.serve +
# service.wait
_launcher = None
def serve(*servers):
global _launcher
if not _launcher:
_launcher = Launcher()
for server in servers:
_launcher.launch_server(server)
def wait():
LOG.debug(_('Full set of FLAGS:'))
for flag in FLAGS:
flag_get = FLAGS.get(flag, None)
# hide flag contents from log if contains a password
# should use secret flag when switch over to openstack-common
if ("_password" in flag or "_key" in flag or
(flag == "sql_connection" and "mysql:" in flag_get)):
LOG.debug(_('%(flag)s : FLAG SET ') % locals())
else:
LOG.debug('%(flag)s : %(flag_get)s' % locals())
try:
_launcher.wait()
except KeyboardInterrupt:
_launcher.stop()
rpc.cleanup()