diff --git a/bin/heat-engine b/bin/heat-engine index df0176b595..7f9bbf607d 100755 --- a/bin/heat-engine +++ b/bin/heat-engine @@ -21,34 +21,37 @@ which then calls into this engine. """ import gettext +import eventlet +eventlet.monkey_patch() + import os import sys +import logging # If ../heat/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... -possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), +POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), os.pardir, os.pardir)) -if os.path.exists(os.path.join(possible_topdir, 'heat', '__init__.py')): - sys.path.insert(0, possible_topdir) +if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'heat', '__init__.py')): + sys.path.insert(0, POSSIBLE_TOPDIR) gettext.install('heat', unicode=1) +from heat import service +from heat.common import utils from heat.common import config -from heat.common import wsgi +logger = logging.getLogger('heat.engine') if __name__ == '__main__': - try: - conf = config.HeatConfigOpts() - conf() - app = config.load_paste_app(conf) + config.FLAGS(sys.argv) + config.setup_logging(config.FLAGS) - port = config.DEFAULT_PORT+1 - print 'Starting Heat Engine on port %s' % port - server = wsgi.Server() - server.start(app, conf, default_port=port) - server.wait() - except RuntimeError, e: - sys.exit("ERROR: %s" % e) + #utils.monkey_patch() + server = service.Service.create(binary='heat-engine', + topic='engine', + manager='heat.engine.manager.EngineManager') + service.serve(server) + service.wait() diff --git a/heat/common/config.py b/heat/common/config.py index 5f4466375e..eebe86ccfe 100644 --- a/heat/common/config.py +++ b/heat/common/config.py @@ -22,6 +22,7 @@ import logging import logging.config import logging.handlers import os +import socket import sys from heat import version @@ -37,6 +38,7 @@ paste_deploy_opts = [ ] + class HeatConfigOpts(cfg.CommonConfigOpts): def __init__(self, default_config_files=None, **kwargs): @@ -46,13 +48,27 @@ class HeatConfigOpts(cfg.CommonConfigOpts): default_config_files=default_config_files, **kwargs) +class HeatEngineConfigOpts(cfg.CommonConfigOpts): + engine_opts = [ + cfg.StrOpt('host', + default=socket.gethostname(), + help='Name of this node. This can be an opaque identifier. ' + 'It is not necessarily a hostname, FQDN, or IP address.'), + cfg.StrOpt('instance_driver', + default='heat.engine.nova', + help='Driver to use for controlling instances'), + ] -class HeatCacheConfigOpts(HeatConfigOpts): - - def __init__(self, **kwargs): + def __init__(self, default_config_files=None, **kwargs): + super(HeatEngineConfigOpts, self).__init__( + project='heat', + version='%%prog %s' % version.version_string(), + **kwargs) config_files = cfg.find_config_files(project='heat', - prog='heat-cache') - super(HeatCacheConfigOpts, self).__init__(config_files, **kwargs) + prog='heat-engine') + self.register_cli_opts(self.engine_opts) + +FLAGS = HeatEngineConfigOpts() def setup_logging(conf): diff --git a/heat/common/exception.py b/heat/common/exception.py index bf33872984..e806cd497e 100644 --- a/heat/common/exception.py +++ b/heat/common/exception.py @@ -17,6 +17,7 @@ """Heat exception subclasses""" +import functools import urlparse @@ -53,6 +54,81 @@ class HeatException(Exception): def __str__(self): return self._error_string +def wrap_exception(notifier=None, publisher_id=None, event_type=None, + level=None): + """This decorator wraps a method to catch any exceptions that may + get thrown. It logs the exception as well as optionally sending + it to the notification system. + """ + # TODO(sandy): Find a way to import nova.notifier.api so we don't have + # to pass it in as a parameter. Otherwise we get a cyclic import of + # nova.notifier.api -> nova.utils -> nova.exception :( + # TODO(johannes): Also, it would be nice to use + # utils.save_and_reraise_exception() without an import loop + def inner(f): + def wrapped(*args, **kw): + try: + return f(*args, **kw) + except Exception, e: + # Save exception since it can be clobbered during processing + # below before we can re-raise + exc_info = sys.exc_info() + + if notifier: + payload = dict(args=args, exception=e) + payload.update(kw) + + # Use a temp vars so we don't shadow + # our outer definitions. + temp_level = level + if not temp_level: + temp_level = notifier.ERROR + + temp_type = event_type + if not temp_type: + # If f has multiple decorators, they must use + # functools.wraps to ensure the name is + # propagated. + temp_type = f.__name__ + + notifier.notify(publisher_id, temp_type, temp_level, + payload) + + # re-raise original exception since it may have been clobbered + raise exc_info[0], exc_info[1], exc_info[2] + + return functools.wraps(f)(wrapped) + return inner + + +class NovaException(Exception): + """Base Nova Exception + + To correctly use this class, inherit from it and define + a 'message' property. That message will get printf'd + with the keyword arguments provided to the constructor. + + """ + message = _("An unknown exception occurred.") + + def __init__(self, message=None, **kwargs): + self.kwargs = kwargs + + if 'code' not in self.kwargs: + try: + self.kwargs['code'] = self.code + except AttributeError: + pass + + if not message: + try: + message = self.message % kwargs + + except Exception as e: + # at least get the core message out if something happened + message = self.message + + super(NovaException, self).__init__(message) class MissingArgumentError(HeatException): message = _("Missing required argument.") diff --git a/heat/common/utils.py b/heat/common/utils.py index 4bd3973f0b..a1c55c73d6 100644 --- a/heat/common/utils.py +++ b/heat/common/utils.py @@ -20,6 +20,7 @@ System-level utilities and helper functions. """ +import datetime import sys import uuid @@ -75,3 +76,14 @@ def import_object(import_str): def generate_uuid(): return str(uuid.uuid4()) + +def gen_uuid(): + return uuid.uuid4() + +def utcnow(): + """Overridable version of utils.utcnow.""" + if utcnow.override_time: + return utcnow.override_time + return datetime.datetime.utcnow() + +utcnow.override_time = None diff --git a/heat/context.py b/heat/context.py new file mode 100644 index 0000000000..4ab7afe29c --- /dev/null +++ b/heat/context.py @@ -0,0 +1,123 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""RequestContext: context for requests that persist through all of nova.""" + +import copy +import logging + +from heat import local +from heat.common import utils + + +LOG = logging.getLogger(__name__) + + +def generate_request_id(): + return 'req-' + str(utils.gen_uuid()) + + +class RequestContext(object): + """Security context and request information. + + Represents the user taking a given action within the system. + + """ + + def __init__(self, user_id, project_id, is_admin=None, read_deleted="no", + roles=None, remote_address=None, timestamp=None, + request_id=None, auth_token=None, overwrite=True, **kwargs): + """ + :param read_deleted: 'no' indicates deleted records are hidden, 'yes' + indicates deleted records are visible, 'only' indicates that + *only* deleted records are visible. + + :param overwrite: Set to False to ensure that the greenthread local + copy of the index is not overwritten. + + :param kwargs: Extra arguments that might be present, but we ignore + because they possibly came in from older rpc messages. + """ + if read_deleted not in ('no', 'yes', 'only'): + raise ValueError(_("read_deleted can only be one of 'no', " + "'yes' or 'only', not %r") % read_deleted) + if kwargs: + LOG.warn(_('Arguments dropped when creating context: %s') % + str(kwargs)) + + self.user_id = user_id + self.project_id = project_id + self.roles = roles or [] + self.is_admin = is_admin + if self.is_admin is None: + self.is_admin = 'admin' in [x.lower() for x in self.roles] + elif self.is_admin and 'admin' not in self.roles: + self.roles.append('admin') + self.read_deleted = read_deleted + self.remote_address = remote_address + if not timestamp: + timestamp = utils.utcnow() + if isinstance(timestamp, basestring): + timestamp = utils.parse_strtime(timestamp) + self.timestamp = timestamp + if not request_id: + request_id = generate_request_id() + self.request_id = request_id + self.auth_token = auth_token + if overwrite or not hasattr(local.store, 'context'): + self.update_store() + + def update_store(self): + local.store.context = self + + def to_dict(self): + return {'user_id': self.user_id, + 'project_id': self.project_id, + 'is_admin': self.is_admin, + 'read_deleted': self.read_deleted, + 'roles': self.roles, + 'remote_address': self.remote_address, + 'timestamp': utils.strtime(self.timestamp), + 'request_id': self.request_id, + 'auth_token': self.auth_token} + + @classmethod + def from_dict(cls, values): + return cls(**values) + + def elevated(self, read_deleted=None, overwrite=False): + """Return a version of this context with admin flag set.""" + context = copy.copy(self) + context.is_admin = True + + if 'admin' not in context.roles: + context.roles.append('admin') + + if read_deleted is not None: + context.read_deleted = read_deleted + + return context + + +def get_admin_context(read_deleted="no"): + return RequestContext(user_id=None, + project_id=None, + is_admin=True, + read_deleted=read_deleted, + overwrite=False) diff --git a/heat/engine/manager.py b/heat/engine/manager.py new file mode 100644 index 0000000000..1c3887d061 --- /dev/null +++ b/heat/engine/manager.py @@ -0,0 +1,65 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Handles all processes relating to instances (guest vms). + +The :py:class:`ComputeManager` class is a :py:class:`heat.manager.Manager` that +handles RPC calls relating to creating instances. It is responsible for +building a disk image, launching it via the underlying virtualization driver, +responding to calls to check its state, attaching persistent storage, and +terminating it. + +**Related Flags** + +:instances_path: Where instances are kept on disk +:compute_driver: Name of class that is used to handle virtualization, loaded + by :func:`heat.utils.import_object` + +""" + +import contextlib +import functools +import os +import socket +import sys +import tempfile +import time +import traceback +import logging + +from eventlet import greenthread + +import heat.context +from heat import exception +from heat import manager +from heat.openstack.common import cfg +from heat import rpc + +LOG = logging.getLogger(__name__) + + +class EngineManager(manager.Manager): + """Manages the running instances from creation to destruction.""" + + def __init__(self, *args, **kwargs): + """Load configuration options and connect to the hypervisor.""" + + def create(self, template, stack_id): + pass + diff --git a/heat/local.py b/heat/local.py new file mode 100644 index 0000000000..19d962732c --- /dev/null +++ b/heat/local.py @@ -0,0 +1,37 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Greenthread local storage of variables using weak references""" + +import weakref + +from eventlet import corolocal + + +class WeakLocal(corolocal.local): + def __getattribute__(self, attr): + rval = corolocal.local.__getattribute__(self, attr) + if rval: + rval = rval() + return rval + + def __setattr__(self, attr, value): + value = weakref.ref(value) + return corolocal.local.__setattr__(self, attr, value) + + +store = WeakLocal() diff --git a/heat/manager.py b/heat/manager.py new file mode 100644 index 0000000000..3921281841 --- /dev/null +++ b/heat/manager.py @@ -0,0 +1,175 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Base Manager class. + +Managers are responsible for a certain aspect of the system. It is a logical +grouping of code relating to a portion of the system. In general other +components should be using the manager to make changes to the components that +it is responsible for. + +For example, other components that need to deal with volumes in some way, +should do so by calling methods on the VolumeManager instead of directly +changing fields in the database. This allows us to keep all of the code +relating to volumes in the same place. + +We have adopted a basic strategy of Smart managers and dumb data, which means +rather than attaching methods to data objects, components should call manager +methods that act on the data. + +Methods on managers that can be executed locally should be called directly. If +a particular method must execute on a remote host, this should be done via rpc +to the service that wraps the manager + +Managers should be responsible for most of the db access, and +non-implementation specific data. Anything implementation specific that can't +be generalized should be done by the Driver. + +In general, we prefer to have one manager with multiple drivers for different +implementations, but sometimes it makes sense to have multiple managers. You +can think of it this way: Abstract different overall strategies at the manager +level(FlatNetwork vs VlanNetwork), and different implementations at the driver +level(LinuxNetDriver vs CiscoNetDriver). + +Managers will often provide methods for initial setup of a host or periodic +tasks to a wrapping service. + +This module provides Manager, a base class for managers. + +""" + +import logging + +from heat import version +from heat.common import config + +FLAGS = config.FLAGS +LOG = logging.getLogger(__name__) + + +def periodic_task(*args, **kwargs): + """Decorator to indicate that a method is a periodic task. + + This decorator can be used in two ways: + + 1. Without arguments '@periodic_task', this will be run on every tick + of the periodic scheduler. + + 2. With arguments, @periodic_task(ticks_between_runs=N), this will be + run on every N ticks of the periodic scheduler. + """ + def decorator(f): + f._periodic_task = True + f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0) + return f + + # NOTE(sirp): The `if` is necessary to allow the decorator to be used with + # and without parens. + # + # In the 'with-parens' case (with kwargs present), this function needs to + # return a decorator function since the interpreter will invoke it like: + # + # periodic_task(*args, **kwargs)(f) + # + # In the 'without-parens' case, the original function will be passed + # in as the first argument, like: + # + # periodic_task(f) + if kwargs: + return decorator + else: + return decorator(args[0]) + + +class ManagerMeta(type): + def __init__(cls, names, bases, dict_): + """Metaclass that allows us to collect decorated periodic tasks.""" + super(ManagerMeta, cls).__init__(names, bases, dict_) + + # NOTE(sirp): if the attribute is not present then we must be the base + # class, so, go ahead an initialize it. If the attribute is present, + # then we're a subclass so make a copy of it so we don't step on our + # parent's toes. + try: + cls._periodic_tasks = cls._periodic_tasks[:] + except AttributeError: + cls._periodic_tasks = [] + + try: + cls._ticks_to_skip = cls._ticks_to_skip.copy() + except AttributeError: + cls._ticks_to_skip = {} + + for value in cls.__dict__.values(): + if getattr(value, '_periodic_task', False): + task = value + name = task.__name__ + cls._periodic_tasks.append((name, task)) + cls._ticks_to_skip[name] = task._ticks_between_runs + + +class Manager(object): + __metaclass__ = ManagerMeta + + def __init__(self, host=None, db_driver=None): + if not host: + host = FLAGS.host + self.host = host + super(Manager, self).__init__(db_driver) + + def periodic_tasks(self, context, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + for task_name, task in self._periodic_tasks: + full_task_name = '.'.join([self.__class__.__name__, task_name]) + + ticks_to_skip = self._ticks_to_skip[task_name] + if ticks_to_skip > 0: + LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s" + " ticks left until next run"), locals()) + self._ticks_to_skip[task_name] -= 1 + continue + + self._ticks_to_skip[task_name] = task._ticks_between_runs + LOG.debug(_("Running periodic task %(full_task_name)s"), locals()) + + try: + task(self, context) + except Exception as e: + if raise_on_error: + raise + LOG.exception(_("Error during %(full_task_name)s: %(e)s"), + locals()) + + def init_host(self): + """Handle initialization if this is a standalone service. + + Child classes should override this method. + + """ + pass + + def service_version(self, context): + return version.version_string() + + def service_config(self, context): + config = {} + for key in FLAGS: + config[key] = FLAGS.get(key, None) + return config + + diff --git a/heat/rpc/__init__.py b/heat/rpc/__init__.py new file mode 100644 index 0000000000..6ad0992c22 --- /dev/null +++ b/heat/rpc/__init__.py @@ -0,0 +1,202 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from heat.openstack.common import cfg +from heat.common import utils +from heat.common import config + + +rpc_backend_opt = cfg.StrOpt('rpc_backend', + default='heat.rpc.impl_qpid', + help="The messaging module to use, defaults to kombu.") + +FLAGS = config.FLAGS +FLAGS.register_opt(rpc_backend_opt) + + +def create_connection(new=True): + """Create a connection to the message bus used for rpc. + + For some example usage of creating a connection and some consumers on that + connection, see nova.service. + + :param new: Whether or not to create a new connection. A new connection + will be created by default. If new is False, the + implementation is free to return an existing connection from a + pool. + + :returns: An instance of nova.rpc.common.Connection + """ + return _get_impl().create_connection(new=new) + + +def call(context, topic, msg, timeout=None): + """Invoke a remote method that returns something. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + :param timeout: int, number of seconds to use for a response timeout. + If set, this overrides the rpc_response_timeout option. + + :returns: A dict from the remote method. + + :raises: nova.rpc.common.Timeout if a complete response is not received + before the timeout is reached. + """ + return _get_impl().call(context, topic, msg, timeout) + + +def cast(context, topic, msg): + """Invoke a remote method that does not return anything. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().cast(context, topic, msg) + + +def fanout_cast(context, topic, msg): + """Broadcast a remote method invocation with no return. + + This method will get invoked on all consumers that were set up with this + topic name and fanout=True. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=True. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().fanout_cast(context, topic, msg) + + +def multicall(context, topic, msg, timeout=None): + """Invoke a remote method and get back an iterator. + + In this case, the remote method will be returning multiple values in + separate messages, so the return values can be processed as the come in via + an iterator. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + :param timeout: int, number of seconds to use for a response timeout. + If set, this overrides the rpc_response_timeout option. + + :returns: An iterator. The iterator will yield a tuple (N, X) where N is + an index that starts at 0 and increases by one for each value + returned and X is the Nth value that was returned by the remote + method. + + :raises: nova.rpc.common.Timeout if a complete response is not received + before the timeout is reached. + """ + return _get_impl().multicall(context, topic, msg, timeout) + + +def notify(context, topic, msg): + """Send notification event. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the notification to. + :param msg: This is a dict of content of event. + + :returns: None + """ + return _get_impl().notify(context, topic, msg) + + +def cleanup(): + """Clean up resoruces in use by implementation. + + Clean up any resources that have been allocated by the RPC implementation. + This is typically open connections to a messaging service. This function + would get called before an application using this API exits to allow + connections to get torn down cleanly. + + :returns: None + """ + return _get_impl().cleanup() + + +def cast_to_server(context, server_params, topic, msg): + """Invoke a remote method that does not return anything. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().cast_to_server(context, server_params, topic, msg) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Broadcast to a remote method invocation with no return. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().fanout_cast_to_server(context, server_params, topic, + msg) + + +_RPCIMPL = None + + +def _get_impl(): + """Delay import of rpc_backend until FLAGS are loaded.""" + global _RPCIMPL + if _RPCIMPL is None: + _RPCIMPL = utils.import_object(FLAGS.rpc_backend) + return _RPCIMPL diff --git a/heat/rpc/amqp.py b/heat/rpc/amqp.py new file mode 100644 index 0000000000..b4e548b3ca --- /dev/null +++ b/heat/rpc/amqp.py @@ -0,0 +1,384 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 - 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Shared code between AMQP based nova.rpc implementations. + +The code in this module is shared between the rpc implemenations based on AMQP. +Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses +AMQP, but is deprecated and predates this code. +""" + +import inspect +import logging +import sys +import traceback +import uuid + +from eventlet import greenpool +from eventlet import pools + +from heat import context +from heat.common import exception +from heat.common import config +from heat import local +import heat.rpc.common as rpc_common + +LOG = logging.getLogger(__name__) +FLAGS = config.FLAGS + + +class Pool(pools.Pool): + """Class that implements a Pool of Connections.""" + def __init__(self, *args, **kwargs): + self.connection_cls = kwargs.pop("connection_cls", None) + kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size) + kwargs.setdefault("order_as_stack", True) + super(Pool, self).__init__(*args, **kwargs) + + # TODO(comstud): Timeout connections not used in a while + def create(self): + LOG.debug('Pool creating new connection') + return self.connection_cls() + + def empty(self): + while self.free_items: + self.get().close() + + +class ConnectionContext(rpc_common.Connection): + """The class that is actually returned to the caller of + create_connection(). This is a essentially a wrapper around + Connection that supports 'with' and can return a new Connection or + one from a pool. It will also catch when an instance of this class + is to be deleted so that we can return Connections to the pool on + exceptions and so forth without making the caller be responsible for + catching all exceptions and making sure to return a connection to + the pool. + """ + + def __init__(self, connection_pool, pooled=True, server_params=None): + """Create a new connection, or get one from the pool""" + self.connection = None + self.connection_pool = connection_pool + if pooled: + self.connection = connection_pool.get() + else: + self.connection = connection_pool.connection_cls( + server_params=server_params) + self.pooled = pooled + + def __enter__(self): + """When with ConnectionContext() is used, return self""" + return self + + def _done(self): + """If the connection came from a pool, clean it up and put it back. + If it did not come from a pool, close it. + """ + if self.connection: + if self.pooled: + # Reset the connection so it's ready for the next caller + # to grab from the pool + self.connection.reset() + self.connection_pool.put(self.connection) + else: + try: + self.connection.close() + except Exception: + pass + self.connection = None + + def __exit__(self, exc_type, exc_value, tb): + """End of 'with' statement. We're done here.""" + self._done() + + def __del__(self): + """Caller is done with this connection. Make sure we cleaned up.""" + self._done() + + def close(self): + """Caller is done with this connection.""" + self._done() + + def create_consumer(self, topic, proxy, fanout=False): + self.connection.create_consumer(topic, proxy, fanout) + + def consume_in_thread(self): + self.connection.consume_in_thread() + + def __getattr__(self, key): + """Proxy all other calls to the Connection instance""" + if self.connection: + return getattr(self.connection, key) + else: + raise exception.InvalidRPCConnectionReuse() + + +def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False): + """Sends a reply or an error on the channel signified by msg_id. + + Failure should be a sys.exc_info() tuple. + + """ + with ConnectionContext(connection_pool) as conn: + if failure: + message = str(failure[1]) + tb = traceback.format_exception(*failure) + LOG.error(_("Returning exception %s to caller"), message) + LOG.error(tb) + failure = (failure[0].__name__, str(failure[1]), tb) + + try: + msg = {'result': reply, 'failure': failure} + except TypeError: + msg = {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()), + 'failure': failure} + if ending: + msg['ending'] = True + conn.direct_send(msg_id, msg) + + +class RpcContext(context.RequestContext): + """Context that supports replying to a rpc.call""" + def __init__(self, *args, **kwargs): + self.msg_id = kwargs.pop('msg_id', None) + super(RpcContext, self).__init__(*args, **kwargs) + + def reply(self, reply=None, failure=None, ending=False, + connection_pool=None): + if self.msg_id: + msg_reply(self.msg_id, connection_pool, reply, failure, + ending) + if ending: + self.msg_id = None + + +def unpack_context(msg): + """Unpack context from msg.""" + context_dict = {} + for key in list(msg.keys()): + # NOTE(vish): Some versions of python don't like unicode keys + # in kwargs. + key = str(key) + if key.startswith('_context_'): + value = msg.pop(key) + context_dict[key[9:]] = value + context_dict['msg_id'] = msg.pop('_msg_id', None) + ctx = RpcContext.from_dict(context_dict) + LOG.debug(_('unpacked context: %s'), ctx.to_dict()) + return ctx + + +def pack_context(msg, context): + """Pack context into msg. + + Values for message keys need to be less than 255 chars, so we pull + context out into a bunch of separate keys. If we want to support + more arguments in rabbit messages, we may want to do the same + for args at some point. + + """ + context_d = dict([('_context_%s' % key, value) + for (key, value) in context.to_dict().iteritems()]) + msg.update(context_d) + + +class ProxyCallback(object): + """Calls methods on a proxy object based on method and args.""" + + def __init__(self, proxy, connection_pool): + self.proxy = proxy + self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) + self.connection_pool = connection_pool + + def __call__(self, message_data): + """Consumer callback to call a method on a proxy object. + + Parses the message for validity and fires off a thread to call the + proxy object method. + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + + """ + # It is important to clear the context here, because at this point + # the previous context is stored in local.store.context + if hasattr(local.store, 'context'): + del local.store.context + rpc_common._safe_log(LOG.debug, _('received %s'), message_data) + ctxt = unpack_context(message_data) + method = message_data.get('method') + args = message_data.get('args', {}) + if not method: + LOG.warn(_('no method for message: %s') % message_data) + ctxt.reply(_('No method for message: %s') % message_data, + connection_pool=self.connection_pool) + return + self.pool.spawn_n(self._process_data, ctxt, method, args) + + @exception.wrap_exception() + def _process_data(self, ctxt, method, args): + """Thread that magically looks for a method on the proxy + object and calls it. + """ + ctxt.update_store() + try: + node_func = getattr(self.proxy, str(method)) + node_args = dict((str(k), v) for k, v in args.iteritems()) + # NOTE(vish): magic is fun! + rval = node_func(context=ctxt, **node_args) + # Check if the result was a generator + if inspect.isgenerator(rval): + for x in rval: + ctxt.reply(x, None, connection_pool=self.connection_pool) + else: + ctxt.reply(rval, None, connection_pool=self.connection_pool) + # This final None tells multicall that it is done. + ctxt.reply(ending=True, connection_pool=self.connection_pool) + except Exception as e: + LOG.exception('Exception during message handling') + ctxt.reply(None, sys.exc_info(), + connection_pool=self.connection_pool) + return + + +class MulticallWaiter(object): + def __init__(self, connection, timeout): + self._connection = connection + self._iterator = connection.iterconsume( + timeout=timeout or FLAGS.rpc_response_timeout) + self._result = None + self._done = False + self._got_ending = False + + def done(self): + if self._done: + return + self._done = True + self._iterator.close() + self._iterator = None + self._connection.close() + + def __call__(self, data): + """The consume() callback will call this. Store the result.""" + if data['failure']: + self._result = rpc_common.RemoteError(*data['failure']) + elif data.get('ending', False): + self._got_ending = True + else: + self._result = data['result'] + + def __iter__(self): + """Return a result until we get a 'None' response from consumer""" + if self._done: + raise StopIteration + while True: + self._iterator.next() + if self._got_ending: + self.done() + raise StopIteration + result = self._result + if isinstance(result, Exception): + self.done() + raise result + yield result + + +def create_connection(new, connection_pool): + """Create a connection""" + return ConnectionContext(connection_pool, pooled=not new) + + +def multicall(context, topic, msg, timeout, connection_pool): + """Make a call that returns multiple times.""" + # Can't use 'with' for multicall, as it returns an iterator + # that will continue to use the connection. When it's done, + # connection.close() will get called which will put it back into + # the pool + LOG.debug(_('Making asynchronous call on %s ...'), topic) + msg_id = uuid.uuid4().hex + msg.update({'_msg_id': msg_id}) + LOG.debug(_('MSG_ID is %s') % (msg_id)) + pack_context(msg, context) + + conn = ConnectionContext(connection_pool) + wait_msg = MulticallWaiter(conn, timeout) + conn.declare_direct_consumer(msg_id, wait_msg) + conn.topic_send(topic, msg) + return wait_msg + + +def call(context, topic, msg, timeout, connection_pool): + """Sends a message on a topic and wait for a response.""" + rv = multicall(context, topic, msg, timeout, connection_pool) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] + + +def cast(context, topic, msg, connection_pool): + """Sends a message on a topic without waiting for a response.""" + LOG.debug(_('Making asynchronous cast on %s...'), topic) + pack_context(msg, context) + with ConnectionContext(connection_pool) as conn: + conn.topic_send(topic, msg) + + +def fanout_cast(context, topic, msg, connection_pool): + """Sends a message on a fanout exchange without waiting for a response.""" + LOG.debug(_('Making asynchronous fanout cast...')) + pack_context(msg, context) + with ConnectionContext(connection_pool) as conn: + conn.fanout_send(topic, msg) + + +def cast_to_server(context, server_params, topic, msg, connection_pool): + """Sends a message on a topic to a specific server.""" + pack_context(msg, context) + with ConnectionContext(connection_pool, pooled=False, + server_params=server_params) as conn: + conn.topic_send(topic, msg) + + +def fanout_cast_to_server(context, server_params, topic, msg, + connection_pool): + """Sends a message on a fanout exchange to a specific server.""" + pack_context(msg, context) + with ConnectionContext(connection_pool, pooled=False, + server_params=server_params) as conn: + conn.fanout_send(topic, msg) + + +def notify(context, topic, msg, connection_pool): + """Sends a notification event on a topic.""" + LOG.debug(_('Sending notification on %s...'), topic) + pack_context(msg, context) + with ConnectionContext(connection_pool) as conn: + conn.notify_send(topic, msg) + + +def cleanup(connection_pool): + connection_pool.empty() diff --git a/heat/rpc/common.py b/heat/rpc/common.py new file mode 100644 index 0000000000..a425a20430 --- /dev/null +++ b/heat/rpc/common.py @@ -0,0 +1,144 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import logging + +from heat import exception +from heat.openstack.common import cfg +from heat.common import config + + +LOG = logging.getLogger(__name__) + +rpc_opts = [ + cfg.IntOpt('rpc_thread_pool_size', + default=1024, + help='Size of RPC thread pool'), + cfg.IntOpt('rpc_conn_pool_size', + default=30, + help='Size of RPC connection pool'), + cfg.IntOpt('rpc_response_timeout', + default=60, + help='Seconds to wait for a response from call or multicall'), + ] + +config.FLAGS.register_opts(rpc_opts) + + +class RemoteError(exception.NovaException): + """Signifies that a remote class has raised an exception. + + Contains a string representation of the type of the original exception, + the value of the original exception, and the traceback. These are + sent to the parent as a joined string so printing the exception + contains all of the relevant info. + + """ + message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") + + def __init__(self, exc_type=None, value=None, traceback=None): + self.exc_type = exc_type + self.value = value + self.traceback = traceback + super(RemoteError, self).__init__(exc_type=exc_type, + value=value, + traceback=traceback) + + +class Timeout(exception.NovaException): + """Signifies that a timeout has occurred. + + This exception is raised if the rpc_response_timeout is reached while + waiting for a response from the remote side. + """ + message = _("Timeout while waiting on RPC response.") + + +class Connection(object): + """A connection, returned by rpc.create_connection(). + + This class represents a connection to the message bus used for rpc. + An instance of this class should never be created by users of the rpc API. + Use rpc.create_connection() instead. + """ + def close(self): + """Close the connection. + + This method must be called when the connection will no longer be used. + It will ensure that any resources associated with the connection, such + as a network connection, and cleaned up. + """ + raise NotImplementedError() + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer on this connection. + + A consumer is associated with a message queue on the backend message + bus. The consumer will read messages from the queue, unpack them, and + dispatch them to the proxy object. The contents of the message pulled + off of the queue will determine which method gets called on the proxy + object. + + :param topic: This is a name associated with what to consume from. + Multiple instances of a service may consume from the same + topic. For example, all instances of nova-compute consume + from a queue called "compute". In that case, the + messages will get distributed amongst the consumers in a + round-robin fashion if fanout=False. If fanout=True, + every consumer associated with this topic will get a + copy of every message. + :param proxy: The object that will handle all incoming messages. + :param fanout: Whether or not this is a fanout topic. See the + documentation for the topic parameter for some + additional comments on this. + """ + raise NotImplementedError() + + def consume_in_thread(self): + """Spawn a thread to handle incoming messages. + + Spawn a thread that will be responsible for handling all incoming + messages for consumers that were set up on this connection. + + Message dispatching inside of this is expected to be implemented in a + non-blocking manner. An example implementation would be having this + thread pull messages in for all of the consumers, but utilize a thread + pool for dispatching the messages to the proxy objects. + """ + raise NotImplementedError() + + +def _safe_log(log_func, msg, msg_data): + """Sanitizes the msg_data field before logging.""" + SANITIZE = { + 'set_admin_password': ('new_pass',), + 'run_instance': ('admin_password',), + } + method = msg_data['method'] + if method in SANITIZE: + msg_data = copy.deepcopy(msg_data) + args_to_sanitize = SANITIZE[method] + for arg in args_to_sanitize: + try: + msg_data['args'][arg] = "" + except KeyError: + pass + + return log_func(msg, msg_data) diff --git a/heat/rpc/impl_fake.py b/heat/rpc/impl_fake.py new file mode 100644 index 0000000000..a6c2aba448 --- /dev/null +++ b/heat/rpc/impl_fake.py @@ -0,0 +1,188 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Fake RPC implementation which calls proxy methods directly with no +queues. Casts will block, but this is very useful for tests. +""" + +import inspect +import json +import signal +import sys +import time +import traceback + +import eventlet + +from heat import context +from heat.common import config +from heat.rpc import common as rpc_common + +CONSUMERS = {} + +FLAGS = config.FLAGS + + +class RpcContext(context.RequestContext): + def __init__(self, *args, **kwargs): + super(RpcContext, self).__init__(*args, **kwargs) + self._response = [] + self._done = False + + def reply(self, reply=None, failure=None, ending=False): + if ending: + self._done = True + if not self._done: + self._response.append((reply, failure)) + + +class Consumer(object): + def __init__(self, topic, proxy): + self.topic = topic + self.proxy = proxy + + def call(self, context, method, args, timeout): + node_func = getattr(self.proxy, method) + node_args = dict((str(k), v) for k, v in args.iteritems()) + done = eventlet.event.Event() + + def _inner(): + ctxt = RpcContext.from_dict(context.to_dict()) + try: + rval = node_func(context=ctxt, **node_args) + res = [] + # Caller might have called ctxt.reply() manually + for (reply, failure) in ctxt._response: + if failure: + raise failure[0], failure[1], failure[2] + res.append(reply) + # if ending not 'sent'...we might have more data to + # return from the function itself + if not ctxt._done: + if inspect.isgenerator(rval): + for val in rval: + res.append(val) + else: + res.append(rval) + done.send(res) + except Exception: + exc_info = sys.exc_info() + done.send_exception( + rpc_common.RemoteError(exc_info[0].__name__, + str(exc_info[1]), + ''.join(traceback.format_exception(*exc_info)))) + + thread = eventlet.greenthread.spawn(_inner) + + if timeout: + start_time = time.time() + while not done.ready(): + eventlet.greenthread.sleep(1) + cur_time = time.time() + if (cur_time - start_time) > timeout: + thread.kill() + raise rpc_common.Timeout() + + return done.wait() + + +class Connection(object): + """Connection object.""" + + def __init__(self): + self.consumers = [] + + def create_consumer(self, topic, proxy, fanout=False): + consumer = Consumer(topic, proxy) + self.consumers.append(consumer) + if topic not in CONSUMERS: + CONSUMERS[topic] = [] + CONSUMERS[topic].append(consumer) + + def close(self): + for consumer in self.consumers: + CONSUMERS[consumer.topic].remove(consumer) + self.consumers = [] + + def consume_in_thread(self): + pass + + +def create_connection(new=True): + """Create a connection""" + return Connection() + + +def check_serialize(msg): + """Make sure a message intended for rpc can be serialized.""" + json.dumps(msg) + + +def multicall(context, topic, msg, timeout=None): + """Make a call that returns multiple times.""" + + check_serialize(msg) + + method = msg.get('method') + if not method: + return + args = msg.get('args', {}) + + try: + consumer = CONSUMERS[topic][0] + except (KeyError, IndexError): + return iter([None]) + else: + return consumer.call(context, method, args, timeout) + + +def call(context, topic, msg, timeout=None): + """Sends a message on a topic and wait for a response.""" + rv = multicall(context, topic, msg, timeout) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] + + +def cast(context, topic, msg): + try: + call(context, topic, msg) + except rpc_common.RemoteError: + pass + + +def notify(context, topic, msg): + check_serialize(msg) + + +def cleanup(): + pass + + +def fanout_cast(context, topic, msg): + """Cast to all consumers of a topic""" + check_serialize(msg) + method = msg.get('method') + if not method: + return + args = msg.get('args', {}) + + for consumer in CONSUMERS.get(topic, []): + try: + consumer.call(context, method, args, None) + except rpc_common.RemoteError: + pass diff --git a/heat/rpc/impl_kombu.py b/heat/rpc/impl_kombu.py new file mode 100644 index 0000000000..7199273a1b --- /dev/null +++ b/heat/rpc/impl_kombu.py @@ -0,0 +1,705 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools +import socket +import ssl +import sys +import time +import uuid + +import eventlet +import greenlet +import kombu +import kombu.entity +import kombu.messaging +import kombu.connection + +from heat.common import config +from heat.openstack.common import cfg +from heat.rpc import amqp as rpc_amqp +from heat.rpc import common as rpc_common + +kombu_opts = [ + cfg.StrOpt('kombu_ssl_version', + default='', + help='SSL version to use (valid only if SSL enabled)'), + cfg.StrOpt('kombu_ssl_keyfile', + default='', + help='SSL key file (valid only if SSL enabled)'), + cfg.StrOpt('kombu_ssl_certfile', + default='', + help='SSL cert file (valid only if SSL enabled)'), + cfg.StrOpt('kombu_ssl_ca_certs', + default='', + help=('SSL certification authority file ' + '(valid only if SSL enabled)')), + ] + +FLAGS = config.FLAGS +FLAGS.register_opts(kombu_opts) +LOG = rpc_common.LOG + + +class ConsumerBase(object): + """Consumer base class.""" + + def __init__(self, channel, callback, tag, **kwargs): + """Declare a queue on an amqp channel. + + 'channel' is the amqp channel to use + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + queue name, exchange name, and other kombu options are + passed in here as a dictionary. + """ + self.callback = callback + self.tag = str(tag) + self.kwargs = kwargs + self.queue = None + self.reconnect(channel) + + def reconnect(self, channel): + """Re-declare the queue after a rabbit reconnect""" + self.channel = channel + self.kwargs['channel'] = channel + self.queue = kombu.entity.Queue(**self.kwargs) + self.queue.declare() + + def consume(self, *args, **kwargs): + """Actually declare the consumer on the amqp channel. This will + start the flow of messages from the queue. Using the + Connection.iterconsume() iterator will process the messages, + calling the appropriate callback. + + If a callback is specified in kwargs, use that. Otherwise, + use the callback passed during __init__() + + If kwargs['nowait'] is True, then this call will block until + a message is read. + + Messages will automatically be acked if the callback doesn't + raise an exception + """ + + options = {'consumer_tag': self.tag} + options['nowait'] = kwargs.get('nowait', False) + callback = kwargs.get('callback', self.callback) + if not callback: + raise ValueError("No callback defined") + + def _callback(raw_message): + message = self.channel.message_to_python(raw_message) + try: + callback(message.payload) + message.ack() + except Exception: + LOG.exception(_("Failed to process message... skipping it.")) + + self.queue.consume(*args, callback=_callback, **options) + + def cancel(self): + """Cancel the consuming from the queue, if it has started""" + try: + self.queue.cancel(self.tag) + except KeyError, e: + # NOTE(comstud): Kludge to get around a amqplib bug + if str(e) != "u'%s'" % self.tag: + raise + self.queue = None + + +class DirectConsumer(ConsumerBase): + """Queue/consumer class for 'direct'""" + + def __init__(self, channel, msg_id, callback, tag, **kwargs): + """Init a 'direct' queue. + + 'channel' is the amqp channel to use + 'msg_id' is the msg_id to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + # Default options + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + exchange = kombu.entity.Exchange( + name=msg_id, + type='direct', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(DirectConsumer, self).__init__( + channel, + callback, + tag, + name=msg_id, + exchange=exchange, + routing_key=msg_id, + **options) + + +class TopicConsumer(ConsumerBase): + """Consumer class for 'topic'""" + + def __init__(self, channel, topic, callback, tag, **kwargs): + """Init a 'topic' queue. + + 'channel' is the amqp channel to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + # Default options + options = {'durable': FLAGS.rabbit_durable_queues, + 'auto_delete': False, + 'exclusive': False} + options.update(kwargs) + exchange = kombu.entity.Exchange( + name=FLAGS.control_exchange, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(TopicConsumer, self).__init__( + channel, + callback, + tag, + name=topic, + exchange=exchange, + routing_key=topic, + **options) + + +class FanoutConsumer(ConsumerBase): + """Consumer class for 'fanout'""" + + def __init__(self, channel, topic, callback, tag, **kwargs): + """Init a 'fanout' queue. + + 'channel' is the amqp channel to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + unique = uuid.uuid4().hex + exchange_name = '%s_fanout' % topic + queue_name = '%s_fanout_%s' % (topic, unique) + + # Default options + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + exchange = kombu.entity.Exchange( + name=exchange_name, + type='fanout', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(FanoutConsumer, self).__init__( + channel, + callback, + tag, + name=queue_name, + exchange=exchange, + routing_key=topic, + **options) + + +class Publisher(object): + """Base Publisher class""" + + def __init__(self, channel, exchange_name, routing_key, **kwargs): + """Init the Publisher class with the exchange_name, routing_key, + and other options + """ + self.exchange_name = exchange_name + self.routing_key = routing_key + self.kwargs = kwargs + self.reconnect(channel) + + def reconnect(self, channel): + """Re-establish the Producer after a rabbit reconnection""" + self.exchange = kombu.entity.Exchange(name=self.exchange_name, + **self.kwargs) + self.producer = kombu.messaging.Producer(exchange=self.exchange, + channel=channel, routing_key=self.routing_key) + + def send(self, msg): + """Send a message""" + self.producer.publish(msg) + + +class DirectPublisher(Publisher): + """Publisher class for 'direct'""" + def __init__(self, channel, msg_id, **kwargs): + """init a 'direct' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + super(DirectPublisher, self).__init__(channel, + msg_id, + msg_id, + type='direct', + **options) + + +class TopicPublisher(Publisher): + """Publisher class for 'topic'""" + def __init__(self, channel, topic, **kwargs): + """init a 'topic' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + options = {'durable': FLAGS.rabbit_durable_queues, + 'auto_delete': False, + 'exclusive': False} + options.update(kwargs) + super(TopicPublisher, self).__init__(channel, + FLAGS.control_exchange, + topic, + type='topic', + **options) + + +class FanoutPublisher(Publisher): + """Publisher class for 'fanout'""" + def __init__(self, channel, topic, **kwargs): + """init a 'fanout' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + super(FanoutPublisher, self).__init__(channel, + '%s_fanout' % topic, + None, + type='fanout', + **options) + + +class NotifyPublisher(TopicPublisher): + """Publisher class for 'notify'""" + + def __init__(self, *args, **kwargs): + self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues) + super(NotifyPublisher, self).__init__(*args, **kwargs) + + def reconnect(self, channel): + super(NotifyPublisher, self).reconnect(channel) + + # NOTE(jerdfelt): Normally the consumer would create the queue, but + # we do this to ensure that messages don't get dropped if the + # consumer is started after we do + queue = kombu.entity.Queue(channel=channel, + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key) + queue.declare() + + +class Connection(object): + """Connection object.""" + + def __init__(self, server_params=None): + self.consumers = [] + self.consumer_thread = None + self.max_retries = FLAGS.rabbit_max_retries + # Try forever? + if self.max_retries <= 0: + self.max_retries = None + self.interval_start = FLAGS.rabbit_retry_interval + self.interval_stepping = FLAGS.rabbit_retry_backoff + # max retry-interval = 30 seconds + self.interval_max = 30 + self.memory_transport = False + + if server_params is None: + server_params = {} + + # Keys to translate from server_params to kombu params + server_params_to_kombu_params = {'username': 'userid'} + + params = {} + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value + + params.setdefault('hostname', FLAGS.rabbit_host) + params.setdefault('port', FLAGS.rabbit_port) + params.setdefault('userid', FLAGS.rabbit_userid) + params.setdefault('password', FLAGS.rabbit_password) + params.setdefault('virtual_host', FLAGS.rabbit_virtual_host) + + self.params = params + + if FLAGS.fake_rabbit: + self.params['transport'] = 'memory' + self.memory_transport = True + else: + self.memory_transport = False + + if FLAGS.rabbit_use_ssl: + self.params['ssl'] = self._fetch_ssl_params() + + self.connection = None + self.reconnect() + + def _fetch_ssl_params(self): + """Handles fetching what ssl params + should be used for the connection (if any)""" + ssl_params = dict() + + # http://docs.python.org/library/ssl.html - ssl.wrap_socket + if FLAGS.kombu_ssl_version: + ssl_params['ssl_version'] = FLAGS.kombu_ssl_version + if FLAGS.kombu_ssl_keyfile: + ssl_params['keyfile'] = FLAGS.kombu_ssl_keyfile + if FLAGS.kombu_ssl_certfile: + ssl_params['certfile'] = FLAGS.kombu_ssl_certfile + if FLAGS.kombu_ssl_ca_certs: + ssl_params['ca_certs'] = FLAGS.kombu_ssl_ca_certs + # We might want to allow variations in the + # future with this? + ssl_params['cert_reqs'] = ssl.CERT_REQUIRED + + if not ssl_params: + # Just have the default behavior + return True + else: + # Return the extended behavior + return ssl_params + + def _connect(self): + """Connect to rabbit. Re-establish any queues that may have + been declared before if we are reconnecting. Exceptions should + be handled by the caller. + """ + if self.connection: + LOG.info(_("Reconnecting to AMQP server on " + "%(hostname)s:%(port)d") % self.params) + try: + self.connection.close() + except self.connection_errors: + pass + # Setting this in case the next statement fails, though + # it shouldn't be doing any network operations, yet. + self.connection = None + self.connection = kombu.connection.BrokerConnection( + **self.params) + self.connection_errors = self.connection.connection_errors + if self.memory_transport: + # Kludge to speed up tests. + self.connection.transport.polling_interval = 0.0 + self.consumer_num = itertools.count(1) + self.connection.connect() + self.channel = self.connection.channel() + # work around 'memory' transport bug in 1.1.3 + if self.memory_transport: + self.channel._new_queue('ae.undeliver') + for consumer in self.consumers: + consumer.reconnect(self.channel) + LOG.info(_('Connected to AMQP server on ' + '%(hostname)s:%(port)d') % self.params) + + def reconnect(self): + """Handles reconnecting and re-establishing queues. + Will retry up to self.max_retries number of times. + self.max_retries = 0 means to retry forever. + Sleep between tries, starting at self.interval_start + seconds, backing off self.interval_stepping number of seconds + each attempt. + """ + + attempt = 0 + while True: + attempt += 1 + try: + self._connect() + return + except self.connection_errors, e: + pass + except Exception, e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + + log_info = {} + log_info['err_str'] = str(e) + log_info['max_retries'] = self.max_retries + log_info.update(self.params) + + if self.max_retries and attempt == self.max_retries: + LOG.exception(_('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) + # NOTE(comstud): Copied from original code. There's + # really no better recourse because if this was a queue we + # need to consume on, we have no way to consume anymore. + sys.exit(1) + + if attempt == 1: + sleep_time = self.interval_start or 1 + elif attempt > 1: + sleep_time += self.interval_stepping + if self.interval_max: + sleep_time = min(sleep_time, self.interval_max) + + log_info['sleep_time'] = sleep_time + LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' + ' unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) + time.sleep(sleep_time) + + def ensure(self, error_callback, method, *args, **kwargs): + while True: + try: + return method(*args, **kwargs) + except (self.connection_errors, socket.timeout), e: + pass + except Exception, e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + if error_callback: + error_callback(e) + self.reconnect() + + def get_channel(self): + """Convenience call for bin/clear_rabbit_queues""" + return self.channel + + def close(self): + """Close/release this connection""" + self.cancel_consumer_thread() + self.connection.release() + self.connection = None + + def reset(self): + """Reset a connection so it can be used again""" + self.cancel_consumer_thread() + self.channel.close() + self.channel = self.connection.channel() + # work around 'memory' transport bug in 1.1.3 + if self.memory_transport: + self.channel._new_queue('ae.undeliver') + self.consumers = [] + + def declare_consumer(self, consumer_cls, topic, callback): + """Create a Consumer using the class that was passed in and + add it to our list of consumers + """ + + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + "%(err_str)s") % log_info) + + def _declare_consumer(): + consumer = consumer_cls(self.channel, topic, callback, + self.consumer_num.next()) + self.consumers.append(consumer) + return consumer + + return self.ensure(_connect_error, _declare_consumer) + + def iterconsume(self, limit=None, timeout=None): + """Return an iterator that will consume from all queues/consumers""" + + info = {'do_consume': True} + + def _error_callback(exc): + if isinstance(exc, socket.timeout): + LOG.exception(_('Timed out waiting for RPC response: %s') % + str(exc)) + raise rpc_common.Timeout() + else: + LOG.exception(_('Failed to consume message from queue: %s') % + str(exc)) + info['do_consume'] = True + + def _consume(): + if info['do_consume']: + queues_head = self.consumers[:-1] + queues_tail = self.consumers[-1] + for queue in queues_head: + queue.consume(nowait=True) + queues_tail.consume(nowait=False) + info['do_consume'] = False + return self.connection.drain_events(timeout=timeout) + + for iteration in itertools.count(0): + if limit and iteration >= limit: + raise StopIteration + yield self.ensure(_error_callback, _consume) + + def cancel_consumer_thread(self): + """Cancel a consumer thread""" + if self.consumer_thread is not None: + self.consumer_thread.kill() + try: + self.consumer_thread.wait() + except greenlet.GreenletExit: + pass + self.consumer_thread = None + + def publisher_send(self, cls, topic, msg, **kwargs): + """Send to a publisher based on the publisher class""" + + def _error_callback(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.exception(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) + + def _publish(): + publisher = cls(self.channel, topic, **kwargs) + publisher.send(msg) + + self.ensure(_error_callback, _publish) + + def declare_direct_consumer(self, topic, callback): + """Create a 'direct' queue. + In nova's use, this is generally a msg_id queue used for + responses for call/multicall + """ + self.declare_consumer(DirectConsumer, topic, callback) + + def declare_topic_consumer(self, topic, callback=None): + """Create a 'topic' consumer.""" + self.declare_consumer(TopicConsumer, topic, callback) + + def declare_fanout_consumer(self, topic, callback): + """Create a 'fanout' consumer""" + self.declare_consumer(FanoutConsumer, topic, callback) + + def direct_send(self, msg_id, msg): + """Send a 'direct' message""" + self.publisher_send(DirectPublisher, msg_id, msg) + + def topic_send(self, topic, msg): + """Send a 'topic' message""" + self.publisher_send(TopicPublisher, topic, msg) + + def fanout_send(self, topic, msg): + """Send a 'fanout' message""" + self.publisher_send(FanoutPublisher, topic, msg) + + def notify_send(self, topic, msg, **kwargs): + """Send a notify message on a topic""" + self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + + def consume(self, limit=None): + """Consume from all queues/consumers""" + it = self.iterconsume(limit=limit) + while True: + try: + it.next() + except StopIteration: + return + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread""" + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return + if self.consumer_thread is None: + self.consumer_thread = eventlet.spawn(_consumer_thread) + return self.consumer_thread + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls a method in a proxy object""" + if fanout: + self.declare_fanout_consumer(topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) + else: + self.declare_topic_consumer(topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) + + +Connection.pool = rpc_amqp.Pool(connection_cls=Connection) + + +def create_connection(new=True): + """Create a connection""" + return rpc_amqp.create_connection(new, Connection.pool) + + +def multicall(context, topic, msg, timeout=None): + """Make a call that returns multiple times.""" + return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) + + +def call(context, topic, msg, timeout=None): + """Sends a message on a topic and wait for a response.""" + return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) + + +def cast(context, topic, msg): + """Sends a message on a topic without waiting for a response.""" + return rpc_amqp.cast(context, topic, msg, Connection.pool) + + +def fanout_cast(context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response.""" + return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) + + +def cast_to_server(context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def notify(context, topic, msg): + """Sends a notification event on a topic.""" + return rpc_amqp.notify(context, topic, msg, Connection.pool) + + +def cleanup(): + return rpc_amqp.cleanup(Connection.pool) diff --git a/heat/rpc/impl_qpid.py b/heat/rpc/impl_qpid.py new file mode 100644 index 0000000000..95b9ee52da --- /dev/null +++ b/heat/rpc/impl_qpid.py @@ -0,0 +1,552 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# Copyright 2011 - 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools +import time +import uuid +import json +import logging + +import eventlet +import greenlet +import qpid.messaging +import qpid.messaging.exceptions + +from heat.common import config +from heat.openstack.common import cfg +from heat.rpc import amqp as rpc_amqp +from heat.rpc import common as rpc_common + +LOG = logging.getLogger(__name__) + +qpid_opts = [ + cfg.StrOpt('qpid_hostname', + default='localhost', + help='Qpid broker hostname'), + cfg.StrOpt('qpid_port', + default='5672', + help='Qpid broker port'), + cfg.StrOpt('qpid_username', + default='', + help='Username for qpid connection'), + cfg.StrOpt('qpid_password', + default='', + help='Password for qpid connection'), + cfg.StrOpt('qpid_sasl_mechanisms', + default='', + help='Space separated list of SASL mechanisms to use for auth'), + cfg.BoolOpt('qpid_reconnect', + default=True, + help='Automatically reconnect'), + cfg.IntOpt('qpid_reconnect_timeout', + default=0, + help='Reconnection timeout in seconds'), + cfg.IntOpt('qpid_reconnect_limit', + default=0, + help='Max reconnections before giving up'), + cfg.IntOpt('qpid_reconnect_interval_min', + default=0, + help='Minimum seconds between reconnection attempts'), + cfg.IntOpt('qpid_reconnect_interval_max', + default=0, + help='Maximum seconds between reconnection attempts'), + cfg.IntOpt('qpid_reconnect_interval', + default=0, + help='Equivalent to setting max and min to the same value'), + cfg.IntOpt('qpid_heartbeat', + default=5, + help='Seconds between connection keepalive heartbeats'), + cfg.StrOpt('qpid_protocol', + default='tcp', + help="Transport to use, either 'tcp' or 'ssl'"), + cfg.BoolOpt('qpid_tcp_nodelay', + default=True, + help='Disable Nagle algorithm'), + ] + +FLAGS = config.FLAGS +FLAGS.register_opts(qpid_opts) + + +class ConsumerBase(object): + """Consumer base class.""" + + def __init__(self, session, callback, node_name, node_opts, + link_name, link_opts): + """Declare a queue on an amqp session. + + 'session' is the amqp session to use + 'callback' is the callback to call when messages are received + 'node_name' is the first part of the Qpid address string, before ';' + 'node_opts' will be applied to the "x-declare" section of "node" + in the address string. + 'link_name' goes into the "name" field of the "link" in the address + string + 'link_opts' will be applied to the "x-declare" section of "link" + in the address string. + """ + self.callback = callback + self.receiver = None + self.session = None + + addr_opts = { + "create": "always", + "node": { + "type": "topic", + "x-declare": { + "durable": True, + "auto-delete": True, + }, + }, + "link": { + "name": link_name, + "durable": True, + "x-declare": { + "durable": False, + "auto-delete": True, + "exclusive": False, + }, + }, + } + addr_opts["node"]["x-declare"].update(node_opts) + addr_opts["link"]["x-declare"].update(link_opts) + + self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + + self.reconnect(session) + + def reconnect(self, session): + """Re-declare the receiver after a qpid reconnect""" + self.session = session + self.receiver = session.receiver(self.address) + self.receiver.capacity = 1 + + def consume(self): + """Fetch the message and pass it to the callback object""" + message = self.receiver.fetch() + self.callback(message.content) + + def get_receiver(self): + return self.receiver + + +class DirectConsumer(ConsumerBase): + """Queue/consumer class for 'direct'""" + + def __init__(self, session, msg_id, callback): + """Init a 'direct' queue. + + 'session' is the amqp session to use + 'msg_id' is the msg_id to listen on + 'callback' is the callback to call when messages are received + """ + + super(DirectConsumer, self).__init__(session, callback, + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + {"exclusive": True}) + + +class TopicConsumer(ConsumerBase): + """Consumer class for 'topic'""" + + def __init__(self, session, topic, callback): + """Init a 'topic' queue. + + 'session' is the amqp session to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + """ + + super(TopicConsumer, self).__init__(session, callback, + "%s/%s" % (FLAGS.control_exchange, topic), {}, + topic, {}) + + +class FanoutConsumer(ConsumerBase): + """Consumer class for 'fanout'""" + + def __init__(self, session, topic, callback): + """Init a 'fanout' queue. + + 'session' is the amqp session to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + """ + + super(FanoutConsumer, self).__init__(session, callback, + "%s_fanout" % topic, + {"durable": False, "type": "fanout"}, + "%s_fanout_%s" % (topic, uuid.uuid4().hex), + {"exclusive": True}) + + +class Publisher(object): + """Base Publisher class""" + + def __init__(self, session, node_name, node_opts=None): + """Init the Publisher class with the exchange_name, routing_key, + and other options + """ + self.sender = None + self.session = session + + addr_opts = { + "create": "always", + "node": { + "type": "topic", + "x-declare": { + "durable": False, + # auto-delete isn't implemented for exchanges in qpid, + # but put in here anyway + "auto-delete": True, + }, + }, + } + if node_opts: + addr_opts["node"]["x-declare"].update(node_opts) + + self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + + self.reconnect(session) + + def reconnect(self, session): + """Re-establish the Sender after a reconnection""" + self.sender = session.sender(self.address) + + def send(self, msg): + """Send a message""" + self.sender.send(msg) + + +class DirectPublisher(Publisher): + """Publisher class for 'direct'""" + def __init__(self, session, msg_id): + """Init a 'direct' publisher.""" + super(DirectPublisher, self).__init__(session, msg_id, + {"type": "Direct"}) + + +class TopicPublisher(Publisher): + """Publisher class for 'topic'""" + def __init__(self, session, topic): + """init a 'topic' publisher. + """ + super(TopicPublisher, self).__init__(session, + "%s/%s" % (FLAGS.control_exchange, topic)) + + +class FanoutPublisher(Publisher): + """Publisher class for 'fanout'""" + def __init__(self, session, topic): + """init a 'fanout' publisher. + """ + super(FanoutPublisher, self).__init__(session, + "%s_fanout" % topic, {"type": "fanout"}) + + +class NotifyPublisher(Publisher): + """Publisher class for notifications""" + def __init__(self, session, topic): + """init a 'topic' publisher. + """ + super(NotifyPublisher, self).__init__(session, + "%s/%s" % (FLAGS.control_exchange, topic), + {"durable": True}) + + +class Connection(object): + """Connection object.""" + + def __init__(self, server_params=None): + self.session = None + self.consumers = {} + self.consumer_thread = None + + if server_params is None: + server_params = {} + + default_params = dict(hostname=FLAGS.qpid_hostname, + port=FLAGS.qpid_port, + username=FLAGS.qpid_username, + password=FLAGS.qpid_password) + + params = server_params + for key in default_params.keys(): + params.setdefault(key, default_params[key]) + + self.broker = params['hostname'] + ":" + str(params['port']) + # Create the connection - this does not open the connection + self.connection = qpid.messaging.Connection(self.broker) + + # Check if flags are set and if so set them for the connection + # before we call open + self.connection.username = params['username'] + self.connection.password = params['password'] + self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms + self.connection.reconnect = FLAGS.qpid_reconnect + if FLAGS.qpid_reconnect_timeout: + self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout + if FLAGS.qpid_reconnect_limit: + self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit + if FLAGS.qpid_reconnect_interval_max: + self.connection.reconnect_interval_max = ( + FLAGS.qpid_reconnect_interval_max) + if FLAGS.qpid_reconnect_interval_min: + self.connection.reconnect_interval_min = ( + FLAGS.qpid_reconnect_interval_min) + if FLAGS.qpid_reconnect_interval: + self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval + self.connection.hearbeat = FLAGS.qpid_heartbeat + self.connection.protocol = FLAGS.qpid_protocol + self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay + + # Open is part of reconnect - + # NOTE(WGH) not sure we need this with the reconnect flags + self.reconnect() + + def _register_consumer(self, consumer): + self.consumers[str(consumer.get_receiver())] = consumer + + def _lookup_consumer(self, receiver): + return self.consumers[str(receiver)] + + def reconnect(self): + """Handles reconnecting and re-establishing sessions and queues""" + if self.connection.opened(): + try: + self.connection.close() + except qpid.messaging.exceptions.ConnectionError: + pass + + while True: + try: + self.connection.open() + except qpid.messaging.exceptions.ConnectionError, e: + LOG.error(_('Unable to connect to AMQP server: %s ') % e) + time.sleep(FLAGS.qpid_reconnect_interval or 1) + else: + break + + LOG.info(_('Connected to AMQP server on %s') % self.broker) + + self.session = self.connection.session() + + for consumer in self.consumers.itervalues(): + consumer.reconnect(self.session) + + if self.consumers: + LOG.debug(_("Re-established AMQP queues")) + + def ensure(self, error_callback, method, *args, **kwargs): + while True: + try: + return method(*args, **kwargs) + except (qpid.messaging.exceptions.Empty, + qpid.messaging.exceptions.ConnectionError), e: + if error_callback: + error_callback(e) + self.reconnect() + + def close(self): + """Close/release this connection""" + self.cancel_consumer_thread() + self.connection.close() + self.connection = None + + def reset(self): + """Reset a connection so it can be used again""" + self.cancel_consumer_thread() + self.session.close() + self.session = self.connection.session() + self.consumers = {} + + def declare_consumer(self, consumer_cls, topic, callback): + """Create a Consumer using the class that was passed in and + add it to our list of consumers + """ + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + "%(err_str)s") % log_info) + + def _declare_consumer(): + consumer = consumer_cls(self.session, topic, callback) + self._register_consumer(consumer) + return consumer + + return self.ensure(_connect_error, _declare_consumer) + + def iterconsume(self, limit=None, timeout=None): + """Return an iterator that will consume from all queues/consumers""" + + def _error_callback(exc): + if isinstance(exc, qpid.messaging.exceptions.Empty): + LOG.exception(_('Timed out waiting for RPC response: %s') % + str(exc)) + raise rpc_common.Timeout() + else: + LOG.exception(_('Failed to consume message from queue: %s') % + str(exc)) + + def _consume(): + nxt_receiver = self.session.next_receiver(timeout=timeout) + try: + self._lookup_consumer(nxt_receiver).consume() + except Exception: + LOG.exception(_("Error processing message. Skipping it.")) + + for iteration in itertools.count(0): + if limit and iteration >= limit: + raise StopIteration + yield self.ensure(_error_callback, _consume) + + def cancel_consumer_thread(self): + """Cancel a consumer thread""" + if self.consumer_thread is not None: + self.consumer_thread.kill() + try: + self.consumer_thread.wait() + except greenlet.GreenletExit: + pass + self.consumer_thread = None + + def publisher_send(self, cls, topic, msg): + """Send to a publisher based on the publisher class""" + + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.exception(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) + + def _publisher_send(): + publisher = cls(self.session, topic) + publisher.send(msg) + + return self.ensure(_connect_error, _publisher_send) + + def declare_direct_consumer(self, topic, callback): + """Create a 'direct' queue. + In nova's use, this is generally a msg_id queue used for + responses for call/multicall + """ + self.declare_consumer(DirectConsumer, topic, callback) + + def declare_topic_consumer(self, topic, callback=None): + """Create a 'topic' consumer.""" + self.declare_consumer(TopicConsumer, topic, callback) + + def declare_fanout_consumer(self, topic, callback): + """Create a 'fanout' consumer""" + self.declare_consumer(FanoutConsumer, topic, callback) + + def direct_send(self, msg_id, msg): + """Send a 'direct' message""" + self.publisher_send(DirectPublisher, msg_id, msg) + + def topic_send(self, topic, msg): + """Send a 'topic' message""" + self.publisher_send(TopicPublisher, topic, msg) + + def fanout_send(self, topic, msg): + """Send a 'fanout' message""" + self.publisher_send(FanoutPublisher, topic, msg) + + def notify_send(self, topic, msg, **kwargs): + """Send a notify message on a topic""" + self.publisher_send(NotifyPublisher, topic, msg) + + def consume(self, limit=None): + """Consume from all queues/consumers""" + it = self.iterconsume(limit=limit) + while True: + try: + it.next() + except StopIteration: + return + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread""" + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return + if self.consumer_thread is None: + self.consumer_thread = eventlet.spawn(_consumer_thread) + return self.consumer_thread + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls a method in a proxy object""" + if fanout: + consumer = FanoutConsumer(self.session, topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) + else: + consumer = TopicConsumer(self.session, topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) + self._register_consumer(consumer) + return consumer + + +Connection.pool = rpc_amqp.Pool(connection_cls=Connection) + + +def create_connection(new=True): + """Create a connection""" + return rpc_amqp.create_connection(new, Connection.pool) + + +def multicall(context, topic, msg, timeout=None): + """Make a call that returns multiple times.""" + return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) + + +def call(context, topic, msg, timeout=None): + """Sends a message on a topic and wait for a response.""" + return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) + + +def cast(context, topic, msg): + """Sends a message on a topic without waiting for a response.""" + return rpc_amqp.cast(context, topic, msg, Connection.pool) + + +def fanout_cast(context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response.""" + return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) + + +def cast_to_server(context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.fanout_cast_to_server(context, server_params, topic, + msg, Connection.pool) + + +def notify(context, topic, msg): + """Sends a notification event on a topic.""" + return rpc_amqp.notify(context, topic, msg, Connection.pool) + + +def cleanup(): + return rpc_amqp.cleanup(Connection.pool) diff --git a/heat/service.py b/heat/service.py new file mode 100644 index 0000000000..5f0cfbd48b --- /dev/null +++ b/heat/service.py @@ -0,0 +1,275 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Generic Node base class for all workers that run on hosts.""" + +import inspect +import os + +import eventlet +import logging +import greenlet + +from heat.openstack.common import cfg + +from heat.common import utils +from heat.common import config + +from heat import context +from heat import exception +from heat import rpc +from heat import version + +LOG = logging.getLogger(__name__) + +service_opts = [ + cfg.IntOpt('report_interval', + default=10, + help='seconds between nodes reporting state to datastore'), + cfg.IntOpt('periodic_interval', + default=60, + help='seconds between running periodic tasks'), + cfg.StrOpt('ec2_listen', + default="0.0.0.0", + help='IP address for EC2 API to listen'), + cfg.IntOpt('ec2_listen_port', + default=8773, + help='port for ec2 api to listen'), + cfg.StrOpt('osapi_compute_listen', + default="0.0.0.0", + help='IP address for OpenStack API to listen'), + cfg.IntOpt('osapi_compute_listen_port', + default=8774, + help='list port for osapi compute'), + cfg.StrOpt('metadata_manager', + default='nova.api.manager.MetadataManager', + help='OpenStack metadata service manager'), + cfg.StrOpt('metadata_listen', + default="0.0.0.0", + help='IP address for metadata api to listen'), + cfg.IntOpt('metadata_listen_port', + default=8775, + help='port for metadata api to listen'), + cfg.StrOpt('osapi_volume_listen', + default="0.0.0.0", + help='IP address for OpenStack Volume API to listen'), + cfg.IntOpt('osapi_volume_listen_port', + default=8776, + help='port for os volume api to listen'), + ] + +FLAGS = config.FLAGS +FLAGS.register_opts(service_opts) + + +class Launcher(object): + """Launch one or more services and wait for them to complete.""" + + def __init__(self): + """Initialize the service launcher. + + :returns: None + + """ + self._services = [] + + @staticmethod + def run_server(server): + """Start and wait for a server to finish. + + :param service: Server to run and wait for. + :returns: None + + """ + server.start() + server.wait() + + def launch_server(self, server): + """Load and start the given server. + + :param server: The server you would like to start. + :returns: None + + """ + gt = eventlet.spawn(self.run_server, server) + self._services.append(gt) + + def stop(self): + """Stop all services which are currently running. + + :returns: None + + """ + for service in self._services: + service.kill() + + def wait(self): + """Waits until all services have been stopped, and then returns. + + :returns: None + + """ + for service in self._services: + try: + service.wait() + except greenlet.GreenletExit: + pass + + +class Service(object): + """Service object for binaries running on hosts. + + A service takes a manager and enables rpc by listening to queues based + on topic. It also periodically runs tasks on the manager and reports + it state to the database services table.""" + + def __init__(self, host, binary, topic, manager, + periodic_interval=None, *args, **kwargs): + self.host = host + self.binary = binary + self.topic = topic + self.manager_class_name = manager + manager_class = utils.import_class(self.manager_class_name) + self.manager = manager_class(host=self.host, *args, **kwargs) + self.periodic_interval = periodic_interval + super(Service, self).__init__(*args, **kwargs) + self.saved_args, self.saved_kwargs = args, kwargs + self.timers = [] + + def start(self): + vcs_string = version.version_string_with_vcs() + LOG.info(_('Starting %(topic)s node (version %(vcs_string)s)'), + {'topic': self.topic, 'vcs_string': vcs_string}) + # TODO do we need this ? -> utils.cleanup_file_locks() + self.manager.init_host() + self.model_disconnected = False + ctxt = context.get_admin_context() + # self._create_service_ref(ctxt) + + self.conn = rpc.create_connection(new=True) + LOG.debug(_("Creating Consumer connection for Service %s") % + self.topic) + + # Share this same connection for these Consumers + self.conn.create_consumer(self.topic, self, fanout=False) + + node_topic = '%s.%s' % (self.topic, self.host) + self.conn.create_consumer(node_topic, self, fanout=False) + + self.conn.create_consumer(self.topic, self, fanout=True) + + # Consume from all consumers in a thread + self.conn.consume_in_thread() + + if self.periodic_interval: + periodic = utils.LoopingCall(self.periodic_tasks) + periodic.start(interval=self.periodic_interval, now=False) + self.timers.append(periodic) + + def __getattr__(self, key): + manager = self.__dict__.get('manager', None) + return getattr(manager, key) + + @classmethod + def create(cls, host=None, binary=None, topic=None, manager=None, + periodic_interval=None): + """Instantiates class and passes back application object. + + :param host: defaults to FLAGS.host + :param binary: defaults to basename of executable + :param topic: defaults to bin_name - 'heat-' part + :param manager: defaults to FLAGS._manager + :param periodic_interval: defaults to FLAGS.periodic_interval + + """ + if not host: + host = FLAGS.host + if not binary: + binary = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = binary.rpartition('heat-')[2] + if not manager: + manager = FLAGS.get('%s_manager' % topic, None) + if not periodic_interval: + periodic_interval = FLAGS.periodic_interval + service_obj = cls(host, binary, topic, manager, + periodic_interval) + + return service_obj + + def kill(self): + self.stop() + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.conn.close() + except Exception: + pass + for x in self.timers: + try: + x.stop() + except Exception: + pass + self.timers = [] + + def wait(self): + for x in self.timers: + try: + x.wait() + except Exception: + pass + + def periodic_tasks(self, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + ctxt = context.get_admin_context() + self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) + + +# NOTE(vish): the global launcher is to maintain the existing +# functionality of calling service.serve + +# service.wait +_launcher = None + + +def serve(*servers): + global _launcher + if not _launcher: + _launcher = Launcher() + for server in servers: + _launcher.launch_server(server) + + +def wait(): + LOG.debug(_('Full set of FLAGS:')) + for flag in FLAGS: + flag_get = FLAGS.get(flag, None) + # hide flag contents from log if contains a password + # should use secret flag when switch over to openstack-common + if ("_password" in flag or "_key" in flag or + (flag == "sql_connection" and "mysql:" in flag_get)): + LOG.debug(_('%(flag)s : FLAG SET ') % locals()) + else: + LOG.debug('%(flag)s : %(flag_get)s' % locals()) + try: + _launcher.wait() + except KeyboardInterrupt: + _launcher.stop() + rpc.cleanup()