diff --git a/etc/reddwarf/reddwarf.conf.sample b/etc/reddwarf/reddwarf.conf.sample index b17aea614f..de57750c20 100644 --- a/etc/reddwarf/reddwarf.conf.sample +++ b/etc/reddwarf/reddwarf.conf.sample @@ -11,6 +11,9 @@ bind_host = 0.0.0.0 # Port the bind the API server to bind_port = 8779 +# AMQP Connection info +rabbit_password=f7999d1955c5014aa32c + # SQLAlchemy connection string for the reference implementation # registry server. Any valid SQLAlchemy connection string is fine. # See: http://www.sqlalchemy.org/docs/05/reference/sqlalchemy/connections.html#sqlalchemy.create_engine diff --git a/next_steps.txt b/next_steps.txt index 2956559a79..16ec03260e 100644 --- a/next_steps.txt +++ b/next_steps.txt @@ -1,3 +1,4 @@ -guest agent +update impl_qpid so it works with reddwarf fix the defaults problem w/ the flagfile... its not overriding +better implementation of a python guest agent initial mysql hardening diff --git a/openstack-common.conf b/openstack-common.conf index db94c4a8cd..6a89f7c889 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,7 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=config,exception,extensions,utils,wsgi,setup +modules=config,context,exception,extensions,utils,wsgi,setup # The base module to hold the copy of openstack.common base=reddwarf diff --git a/reddwarf/common/context.py b/reddwarf/common/context.py new file mode 100644 index 0000000000..d43f9be33c --- /dev/null +++ b/reddwarf/common/context.py @@ -0,0 +1,50 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Simple class that stores security context information in the web request. + +Projects should subclass this class if they wish to enhance the request +context or provide additional information in their specific WSGI pipeline. +""" + +from reddwarf.openstack.common import context +from reddwarf.common import utils + +class ReddwarfContext(context.RequestContext): + + """ + Stores information about the security context under which the user + accesses the system, as well as additional request information. + """ + + def __init__(self, **kwargs): + super(ReddwarfContext, self).__init__(**kwargs) + + def to_dict(self): +# self.auth_tok = auth_tok + return {'user': self.user, + 'tenant': self.tenant, + 'is_admin': self.is_admin, + 'show_deleted': self.show_deleted, + 'read_only': self.read_only, + 'auth_tok': self.auth_tok, + } + + @classmethod + def from_dict(cls, values): + return cls(**values) diff --git a/reddwarf/common/exception.py b/reddwarf/common/exception.py index 0908d46bac..495948262f 100644 --- a/reddwarf/common/exception.py +++ b/reddwarf/common/exception.py @@ -22,7 +22,7 @@ from reddwarf.openstack.common import exception as openstack_exception ClientConnectionError = openstack_exception.ClientConnectionError ProcessExecutionError = openstack_exception.ProcessExecutionError DatabaseMigrationError = openstack_exception.DatabaseMigrationError - +wrap_exception = openstack_exception.wrap_exception class ReddwarfError(openstack_exception.OpenstackException): """Base exception that all custom reddwarf app exceptions inherit from.""" @@ -38,4 +38,9 @@ class DBConstraintError(ReddwarfError): message = _("Failed to save %(model_name)s because: %(error)s") +class InvalidRPCConnectionReuse(ReddwarfError): + + message = _("Invalid RPC Connection Reuse") + + diff --git a/reddwarf/common/local.py b/reddwarf/common/local.py new file mode 100644 index 0000000000..19d962732c --- /dev/null +++ b/reddwarf/common/local.py @@ -0,0 +1,37 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Greenthread local storage of variables using weak references""" + +import weakref + +from eventlet import corolocal + + +class WeakLocal(corolocal.local): + def __getattribute__(self, attr): + rval = corolocal.local.__getattribute__(self, attr) + if rval: + rval = rval() + return rval + + def __setattr__(self, attr, value): + value = weakref.ref(value) + return corolocal.local.__setattr__(self, attr, value) + + +store = WeakLocal() diff --git a/reddwarf/common/utils.py b/reddwarf/common/utils.py index 75348a2c34..31f84256ca 100644 --- a/reddwarf/common/utils.py +++ b/reddwarf/common/utils.py @@ -27,7 +27,7 @@ from reddwarf.openstack.common import utils as openstack_utils import_class = openstack_utils.import_class import_object = openstack_utils.import_object bool_from_string = openstack_utils.bool_from_string - +isotime = openstack_utils.isotime def stringify_keys(dictionary): if dictionary is None: diff --git a/reddwarf/database/service.py b/reddwarf/database/service.py index 64fde4d795..f7ddc23aa9 100644 --- a/reddwarf/database/service.py +++ b/reddwarf/database/service.py @@ -24,6 +24,8 @@ from reddwarf.common import config from reddwarf.common import wsgi from reddwarf.database import models from reddwarf.database import views +from reddwarf.common import context +from reddwarf import rpc LOG = logging.getLogger('reddwarf.database.service') diff --git a/reddwarf/openstack/common/context.py b/reddwarf/openstack/common/context.py new file mode 100644 index 0000000000..a9a16f8e5c --- /dev/null +++ b/reddwarf/openstack/common/context.py @@ -0,0 +1,40 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Simple class that stores security context information in the web request. + +Projects should subclass this class if they wish to enhance the request +context or provide additional information in their specific WSGI pipeline. +""" + + +class RequestContext(object): + + """ + Stores information about the security context under which the user + accesses the system, as well as additional request information. + """ + + def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False, + read_only=False, show_deleted=False): + self.auth_tok = auth_tok + self.user = user + self.tenant = tenant + self.is_admin = is_admin + self.read_only = read_only + self.show_deleted = show_deleted diff --git a/reddwarf/openstack/common/setup.py b/reddwarf/openstack/common/setup.py index 28f5e6f51d..9eabfcca3f 100644 --- a/reddwarf/openstack/common/setup.py +++ b/reddwarf/openstack/common/setup.py @@ -36,10 +36,13 @@ def parse_mailmap(mailmap='.mailmap'): return mapping -def str_dict_replace(s, mapping): - for s1, s2 in mapping.iteritems(): - s = s.replace(s1, s2) - return s +def canonicalize_emails(changelog, mapping): + """ Takes in a string and an email alias mapping and replaces all + instances of the aliases in the string with their real email + """ + for alias, email in mapping.iteritems(): + changelog = changelog.replace(alias, email) + return changelog # Get requirements from the first file that exists @@ -85,3 +88,40 @@ def write_requirements(): stdout=subprocess.PIPE) requirements = output.communicate()[0].strip() req_file.write(requirements) + + +def _run_shell_command(cmd): + output = subprocess.Popen(["/bin/sh", "-c", cmd], + stdout=subprocess.PIPE) + return output.communicate()[0].strip() + + +def write_vcsversion(location): + """ Produce a vcsversion dict that mimics the old one produced by bzr + """ + if os.path.isdir('.git'): + branch_nick_cmd = 'git branch | grep -Ei "\* (.*)" | cut -f2 -d" "' + branch_nick = _run_shell_command(branch_nick_cmd) + revid_cmd = "git rev-parse HEAD" + revid = _run_shell_command(revid_cmd).split()[0] + revno_cmd = "git log --oneline | wc -l" + revno = _run_shell_command(revno_cmd) + with open(location, 'w') as version_file: + version_file.write(""" +# This file is automatically generated by setup.py, So don't edit it. :) +version_info = { + 'branch_nick': '%s', + 'revision_id': '%s', + 'revno': %s +} +""" % (branch_nick, revid, revno)) + + +def write_git_changelog(): + """ Write a changelog based on the git changelog """ + if os.path.isdir('.git'): + git_log_cmd = 'git log --stat' + changelog = _run_shell_command(git_log_cmd) + mailmap = parse_mailmap() + with open("ChangeLog", "w") as changelog_file: + changelog_file.write(canonicalize_emails(changelog, mailmap)) diff --git a/reddwarf/openstack/common/utils.py b/reddwarf/openstack/common/utils.py index e2ef785d77..62686e53c6 100644 --- a/reddwarf/openstack/common/utils.py +++ b/reddwarf/openstack/common/utils.py @@ -28,11 +28,12 @@ import sys from eventlet import greenthread from eventlet.green import subprocess +import iso8601 from reddwarf.openstack.common import exception -TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" +TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" LOG = logging.getLogger(__name__) @@ -163,13 +164,29 @@ def import_object(import_str): def isotime(at=None): + """Stringify time in ISO 8601 format""" if not at: at = datetime.datetime.utcnow() - return at.strftime(TIME_FORMAT) + str = at.strftime(TIME_FORMAT) + tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' + str += ('Z' if tz == 'UTC' else tz) + return str def parse_isotime(timestr): - return datetime.datetime.strptime(timestr, TIME_FORMAT) + """Parse time from ISO 8601 format""" + try: + return iso8601.parse_date(timestr) + except iso8601.ParseError as e: + raise ValueError(e.message) + except TypeError as e: + raise ValueError(e.message) + + +def normalize_time(timestamp): + """Normalize time in arbitrary timezone to UTC""" + offset = timestamp.utcoffset() + return timestamp.replace(tzinfo=None) - offset if offset else timestamp def utcnow(): diff --git a/reddwarf/rpc/__init__.py b/reddwarf/rpc/__init__.py new file mode 100644 index 0000000000..66f622a02f --- /dev/null +++ b/reddwarf/rpc/__init__.py @@ -0,0 +1,198 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from reddwarf.common import utils +from reddwarf.common import config + +LOG = logging.getLogger(__name__) + +rpc_backend_opt = config.Config.get('rpc_backend','reddwarf.rpc.impl_kombu') + +def create_connection(new=True): + """Create a connection to the message bus used for rpc. + + For some example usage of creating a connection and some consumers on that + connection, see nova.service. + + :param new: Whether or not to create a new connection. A new connection + will be created by default. If new is False, the + implementation is free to return an existing connection from a + pool. + + :returns: An instance of nova.rpc.common.Connection + """ + return _get_impl().create_connection(new=new) + + +def call(context, topic, msg, timeout=None): + """Invoke a remote method that returns something. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + :param timeout: int, number of seconds to use for a response timeout. + If set, this overrides the rpc_response_timeout option. + + :returns: A dict from the remote method. + + :raises: nova.rpc.common.Timeout if a complete response is not received + before the timeout is reached. + """ + return _get_impl().call(context, topic, msg, timeout) + + +def cast(context, topic, msg): + """Invoke a remote method that does not return anything. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().cast(context, topic, msg) + + +def fanout_cast(context, topic, msg): + """Broadcast a remote method invocation with no return. + + This method will get invoked on all consumers that were set up with this + topic name and fanout=True. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=True. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().fanout_cast(context, topic, msg) + + +def multicall(context, topic, msg, timeout=None): + """Invoke a remote method and get back an iterator. + + In this case, the remote method will be returning multiple values in + separate messages, so the return values can be processed as the come in via + an iterator. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + :param timeout: int, number of seconds to use for a response timeout. + If set, this overrides the rpc_response_timeout option. + + :returns: An iterator. The iterator will yield a tuple (N, X) where N is + an index that starts at 0 and increases by one for each value + returned and X is the Nth value that was returned by the remote + method. + + :raises: nova.rpc.common.Timeout if a complete response is not received + before the timeout is reached. + """ + return _get_impl().multicall(context, topic, msg, timeout) + + +def notify(context, topic, msg): + """Send notification event. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the notification to. + :param msg: This is a dict of content of event. + + :returns: None + """ + return _get_impl().notify(context, topic, msg) + + +def cleanup(): + """Clean up resoruces in use by implementation. + + Clean up any resources that have been allocated by the RPC implementation. + This is typically open connections to a messaging service. This function + would get called before an application using this API exits to allow + connections to get torn down cleanly. + + :returns: None + """ + return _get_impl().cleanup() + + +def cast_to_server(context, server_params, topic, msg): + """Invoke a remote method that does not return anything. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().cast_to_server(context, server_params, topic, msg) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Broadcast to a remote method invocation with no return. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().fanout_cast_to_server(context, server_params, topic, + msg) + + +_RPCIMPL = None + + +def _get_impl(): + """Delay import of rpc_backend until FLAGS are loaded.""" + global _RPCIMPL + if _RPCIMPL is None: + _RPCIMPL = utils.import_object(rpc_backend_opt) + return _RPCIMPL diff --git a/reddwarf/rpc/amqp.py b/reddwarf/rpc/amqp.py new file mode 100644 index 0000000000..263a4b810c --- /dev/null +++ b/reddwarf/rpc/amqp.py @@ -0,0 +1,383 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 - 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Shared code between AMQP based nova.rpc implementations. + +The code in this module is shared between the rpc implemenations based on AMQP. +Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses +AMQP, but is deprecated and predates this code. +""" + +import inspect +import logging +import sys +import traceback +import uuid + +from eventlet import greenpool +from eventlet import pools + +from reddwarf.common import config +from reddwarf.common import exception +from reddwarf.common import local +import reddwarf.rpc.common as rpc_common +from reddwarf.common import context + + +LOG = logging.getLogger(__name__) + +class Pool(pools.Pool): + """Class that implements a Pool of Connections.""" + def __init__(self, *args, **kwargs): + self.connection_cls = kwargs.pop("connection_cls", None) + kwargs.setdefault("max_size", config.Config.get('rpc_conn_pool_size', 30)) + kwargs.setdefault("order_as_stack", True) + super(Pool, self).__init__(*args, **kwargs) + + # TODO(comstud): Timeout connections not used in a while + def create(self): + LOG.debug('Pool creating new connection') + return self.connection_cls() + + def empty(self): + while self.free_items: + self.get().close() + + +class ConnectionContext(rpc_common.Connection): + """The class that is actually returned to the caller of + create_connection(). This is a essentially a wrapper around + Connection that supports 'with' and can return a new Connection or + one from a pool. It will also catch when an instance of this class + is to be deleted so that we can return Connections to the pool on + exceptions and so forth without making the caller be responsible for + catching all exceptions and making sure to return a connection to + the pool. + """ + + def __init__(self, connection_pool, pooled=True, server_params=None): + """Create a new connection, or get one from the pool""" + self.connection = None + self.connection_pool = connection_pool + if pooled: + self.connection = connection_pool.get() + else: + self.connection = connection_pool.connection_cls( + server_params=server_params) + self.pooled = pooled + + def __enter__(self): + """When with ConnectionContext() is used, return self""" + return self + + def _done(self): + """If the connection came from a pool, clean it up and put it back. + If it did not come from a pool, close it. + """ + if self.connection: + if self.pooled: + # Reset the connection so it's ready for the next caller + # to grab from the pool + self.connection.reset() + self.connection_pool.put(self.connection) + else: + try: + self.connection.close() + except Exception: + pass + self.connection = None + + def __exit__(self, exc_type, exc_value, tb): + """End of 'with' statement. We're done here.""" + self._done() + + def __del__(self): + """Caller is done with this connection. Make sure we cleaned up.""" + self._done() + + def close(self): + """Caller is done with this connection.""" + self._done() + + def create_consumer(self, topic, proxy, fanout=False): + self.connection.create_consumer(topic, proxy, fanout) + + def consume_in_thread(self): + self.connection.consume_in_thread() + + def __getattr__(self, key): + """Proxy all other calls to the Connection instance""" + if self.connection: + return getattr(self.connection, key) + else: + raise exception.InvalidRPCConnectionReuse() + + +def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False): + """Sends a reply or an error on the channel signified by msg_id. + + Failure should be a sys.exc_info() tuple. + + """ + with ConnectionContext(connection_pool) as conn: + if failure: + message = str(failure[1]) + tb = traceback.format_exception(*failure) + LOG.error(_("Returning exception %s to caller"), message) + LOG.error(tb) + failure = (failure[0].__name__, str(failure[1]), tb) + + try: + msg = {'result': reply, 'failure': failure} + except TypeError: + msg = {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()), + 'failure': failure} + if ending: + msg['ending'] = True + conn.direct_send(msg_id, msg) + + +class RpcContext(context.ReddwarfContext): + """Context that supports replying to a rpc.call""" + def __init__(self, *args, **kwargs): + self.msg_id = kwargs.pop('msg_id', None) + super(RpcContext, self).__init__(*args, **kwargs) + + def reply(self, reply=None, failure=None, ending=False, + connection_pool=None): + if self.msg_id: + msg_reply(self.msg_id, connection_pool, reply, failure, + ending) + if ending: + self.msg_id = None + + +def unpack_context(msg): + """Unpack context from msg.""" + context_dict = {} + for key in list(msg.keys()): + # NOTE(vish): Some versions of python don't like unicode keys + # in kwargs. + key = str(key) + if key.startswith('_context_'): + value = msg.pop(key) + context_dict[key[9:]] = value + context_dict['msg_id'] = msg.pop('_msg_id', None) + ctx = RpcContext.from_dict(context_dict) + LOG.debug(_('unpacked context: %s'), ctx.to_dict()) + return ctx + + +def pack_context(msg, context): + """Pack context into msg. + + Values for message keys need to be less than 255 chars, so we pull + context out into a bunch of separate keys. If we want to support + more arguments in rabbit messages, we may want to do the same + for args at some point. + + """ + context_d = dict([('_context_%s' % key, value) + for (key, value) in context.to_dict().iteritems()]) + msg.update(context_d) + + +class ProxyCallback(object): + """Calls methods on a proxy object based on method and args.""" + + def __init__(self, proxy, connection_pool): + self.proxy = proxy + self.pool = greenpool.GreenPool(config.Config.get('rpc_thread_pool_size',1024)) + self.connection_pool = connection_pool + + def __call__(self, message_data): + """Consumer callback to call a method on a proxy object. + + Parses the message for validity and fires off a thread to call the + proxy object method. + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + + """ + # It is important to clear the context here, because at this point + # the previous context is stored in local.store.context + if hasattr(local.store, 'context'): + del local.store.context + rpc_common._safe_log(LOG.debug, _('received %s'), message_data) + ctxt = unpack_context(message_data) + method = message_data.get('method') + args = message_data.get('args', {}) + if not method: + LOG.warn(_('no method for message: %s') % message_data) + ctxt.reply(_('No method for message: %s') % message_data, + connection_pool=self.connection_pool) + return + self.pool.spawn_n(self._process_data, ctxt, method, args) + + @exception.wrap_exception + def _process_data(self, ctxt, method, args): + """Thread that magically looks for a method on the proxy + object and calls it. + """ + + try: + node_func = getattr(self.proxy, str(method)) + node_args = dict((str(k), v) for k, v in args.iteritems()) + # NOTE(vish): magic is fun! + rval = node_func(context=ctxt, **node_args) + # Check if the result was a generator + if inspect.isgenerator(rval): + for x in rval: + ctxt.reply(x, None, connection_pool=self.connection_pool) + else: + ctxt.reply(rval, None, connection_pool=self.connection_pool) + # This final None tells multicall that it is done. + ctxt.reply(ending=True, connection_pool=self.connection_pool) + except Exception as e: + LOG.exception('Exception during message handling') + ctxt.reply(None, sys.exc_info(), + connection_pool=self.connection_pool) + return + + +class MulticallWaiter(object): + def __init__(self, connection, timeout): + self._connection = connection + self._iterator = connection.iterconsume( + timeout=timeout or config.Config.get('rpc_response_timeout', 3600)) + self._result = None + self._done = False + self._got_ending = False + + def done(self): + if self._done: + return + self._done = True + self._iterator.close() + self._iterator = None + self._connection.close() + + def __call__(self, data): + """The consume() callback will call this. Store the result.""" + if data['failure']: + self._result = rpc_common.RemoteError(*data['failure']) + elif data.get('ending', False): + self._got_ending = True + else: + self._result = data['result'] + + def __iter__(self): + """Return a result until we get a 'None' response from consumer""" + if self._done: + raise StopIteration + while True: + self._iterator.next() + if self._got_ending: + self.done() + raise StopIteration + result = self._result + if isinstance(result, Exception): + self.done() + raise result + yield result + + +def create_connection(new, connection_pool): + """Create a connection""" + return ConnectionContext(connection_pool, pooled=not new) + + +def multicall(context, topic, msg, timeout, connection_pool): + """Make a call that returns multiple times.""" + # Can't use 'with' for multicall, as it returns an iterator + # that will continue to use the connection. When it's done, + # connection.close() will get called which will put it back into + # the pool + LOG.debug(_('Making asynchronous call on %s ...'), topic) + msg_id = uuid.uuid4().hex + msg.update({'_msg_id': msg_id}) + LOG.debug(_('MSG_ID is %s') % (msg_id)) + pack_context(msg, context) + + conn = ConnectionContext(connection_pool) + wait_msg = MulticallWaiter(conn, timeout) + conn.declare_direct_consumer(msg_id, wait_msg) + conn.topic_send(topic, msg) + return wait_msg + + +def call(context, topic, msg, timeout, connection_pool): + """Sends a message on a topic and wait for a response.""" + rv = multicall(context, topic, msg, timeout, connection_pool) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] + + +def cast(context, topic, msg, connection_pool): + """Sends a message on a topic without waiting for a response.""" + LOG.debug(_('Making asynchronous cast on %s...'), topic) + pack_context(msg, context) + with ConnectionContext(connection_pool) as conn: + conn.topic_send(topic, msg) + + +def fanout_cast(context, topic, msg, connection_pool): + """Sends a message on a fanout exchange without waiting for a response.""" + LOG.debug(_('Making asynchronous fanout cast...')) + pack_context(msg, context) + with ConnectionContext(connection_pool) as conn: + conn.fanout_send(topic, msg) + + +def cast_to_server(context, server_params, topic, msg, connection_pool): + """Sends a message on a topic to a specific server.""" + pack_context(msg, context) + with ConnectionContext(connection_pool, pooled=False, + server_params=server_params) as conn: + conn.topic_send(topic, msg) + + +def fanout_cast_to_server(context, server_params, topic, msg, + connection_pool): + """Sends a message on a fanout exchange to a specific server.""" + pack_context(msg, context) + with ConnectionContext(connection_pool, pooled=False, + server_params=server_params) as conn: + conn.fanout_send(topic, msg) + + +def notify(context, topic, msg, connection_pool): + """Sends a notification event on a topic.""" + LOG.debug(_('Sending notification on %s...'), topic) + pack_context(msg, context) + with ConnectionContext(connection_pool) as conn: + conn.notify_send(topic, msg) + + +def cleanup(connection_pool): + connection_pool.empty() diff --git a/reddwarf/rpc/common.py b/reddwarf/rpc/common.py new file mode 100644 index 0000000000..f206de73c5 --- /dev/null +++ b/reddwarf/rpc/common.py @@ -0,0 +1,128 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import logging + +from reddwarf.common import config +from reddwarf.common import exception + + +LOG = logging.getLogger(__name__) + +class RemoteError(exception.ReddwarfError): + """Signifies that a remote class has raised an exception. + + Contains a string representation of the type of the original exception, + the value of the original exception, and the traceback. These are + sent to the parent as a joined string so printing the exception + contains all of the relevant info. + + """ + message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") + + def __init__(self, exc_type=None, value=None, traceback=None): + self.exc_type = exc_type + self.value = value + self.traceback = traceback + super(RemoteError, self).__init__(exc_type=exc_type, + value=value, + traceback=traceback) + + +class Timeout(exception.ReddwarfError): + """Signifies that a timeout has occurred. + + This exception is raised if the rpc_response_timeout is reached while + waiting for a response from the remote side. + """ + message = _("Timeout while waiting on RPC response.") + + +class Connection(object): + """A connection, returned by rpc.create_connection(). + + This class represents a connection to the message bus used for rpc. + An instance of this class should never be created by users of the rpc API. + Use rpc.create_connection() instead. + """ + def close(self): + """Close the connection. + + This method must be called when the connection will no longer be used. + It will ensure that any resources associated with the connection, such + as a network connection, and cleaned up. + """ + raise NotImplementedError() + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer on this connection. + + A consumer is associated with a message queue on the backend message + bus. The consumer will read messages from the queue, unpack them, and + dispatch them to the proxy object. The contents of the message pulled + off of the queue will determine which method gets called on the proxy + object. + + :param topic: This is a name associated with what to consume from. + Multiple instances of a service may consume from the same + topic. For example, all instances of nova-compute consume + from a queue called "compute". In that case, the + messages will get distributed amongst the consumers in a + round-robin fashion if fanout=False. If fanout=True, + every consumer associated with this topic will get a + copy of every message. + :param proxy: The object that will handle all incoming messages. + :param fanout: Whether or not this is a fanout topic. See the + documentation for the topic parameter for some + additional comments on this. + """ + raise NotImplementedError() + + def consume_in_thread(self): + """Spawn a thread to handle incoming messages. + + Spawn a thread that will be responsible for handling all incoming + messages for consumers that were set up on this connection. + + Message dispatching inside of this is expected to be implemented in a + non-blocking manner. An example implementation would be having this + thread pull messages in for all of the consumers, but utilize a thread + pool for dispatching the messages to the proxy objects. + """ + raise NotImplementedError() + + +def _safe_log(log_func, msg, msg_data): + """Sanitizes the msg_data field before logging.""" + SANITIZE = { + 'set_admin_password': ('new_pass',), + 'run_instance': ('admin_password',), + } + method = msg_data['method'] + if method in SANITIZE: + msg_data = copy.deepcopy(msg_data) + args_to_sanitize = SANITIZE[method] + for arg in args_to_sanitize: + try: + msg_data['args'][arg] = "" + except KeyError: + pass + + return log_func(msg, msg_data) diff --git a/reddwarf/rpc/impl_kombu.py b/reddwarf/rpc/impl_kombu.py new file mode 100644 index 0000000000..4ad984c066 --- /dev/null +++ b/reddwarf/rpc/impl_kombu.py @@ -0,0 +1,687 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools +import socket +import ssl +import sys +import time +import logging +import uuid + +import eventlet +import greenlet +import kombu +import kombu.entity +import kombu.messaging +import kombu.connection + +from reddwarf.common import config +from reddwarf.rpc import amqp as rpc_amqp +from reddwarf.rpc import common as rpc_common + +LOG = logging.getLogger(__name__) +SSL_VERSION = "SSLv2" + +class ConsumerBase(object): + """Consumer base class.""" + + def __init__(self, channel, callback, tag, **kwargs): + """Declare a queue on an amqp channel. + + 'channel' is the amqp channel to use + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + queue name, exchange name, and other kombu options are + passed in here as a dictionary. + """ + self.callback = callback + self.tag = str(tag) + self.kwargs = kwargs + self.queue = None + self.reconnect(channel) + + def reconnect(self, channel): + """Re-declare the queue after a rabbit reconnect""" + self.channel = channel + self.kwargs['channel'] = channel + self.queue = kombu.entity.Queue(**self.kwargs) + self.queue.declare() + + def consume(self, *args, **kwargs): + """Actually declare the consumer on the amqp channel. This will + start the flow of messages from the queue. Using the + Connection.iterconsume() iterator will process the messages, + calling the appropriate callback. + + If a callback is specified in kwargs, use that. Otherwise, + use the callback passed during __init__() + + If kwargs['nowait'] is True, then this call will block until + a message is read. + + Messages will automatically be acked if the callback doesn't + raise an exception + """ + + options = {'consumer_tag': self.tag} + options['nowait'] = kwargs.get('nowait', False) + callback = kwargs.get('callback', self.callback) + if not callback: + raise ValueError("No callback defined") + + def _callback(raw_message): + message = self.channel.message_to_python(raw_message) + callback(message.payload) + message.ack() + + self.queue.consume(*args, callback=_callback, **options) + + def cancel(self): + """Cancel the consuming from the queue, if it has started""" + try: + self.queue.cancel(self.tag) + except KeyError, e: + # NOTE(comstud): Kludge to get around a amqplib bug + if str(e) != "u'%s'" % self.tag: + raise + self.queue = None + + +class DirectConsumer(ConsumerBase): + """Queue/consumer class for 'direct'""" + + def __init__(self, channel, msg_id, callback, tag, **kwargs): + """Init a 'direct' queue. + + 'channel' is the amqp channel to use + 'msg_id' is the msg_id to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + # Default options + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + exchange = kombu.entity.Exchange( + name=msg_id, + type='direct', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(DirectConsumer, self).__init__( + channel, + callback, + tag, + name=msg_id, + exchange=exchange, + routing_key=msg_id, + **options) + + +class TopicConsumer(ConsumerBase): + """Consumer class for 'topic'""" + + def __init__(self, channel, topic, callback, tag, **kwargs): + """Init a 'topic' queue. + + 'channel' is the amqp channel to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + # Default options + options = {'durable': config.Config.get('rabbit_durable_queues', True), + 'auto_delete': False, + 'exclusive': False} + options.update(kwargs) + exchange = kombu.entity.Exchange( + name=config.Config.get('control_exchange', 'nova'), + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(TopicConsumer, self).__init__( + channel, + callback, + tag, + name=topic, + exchange=exchange, + routing_key=topic, + **options) + + +class FanoutConsumer(ConsumerBase): + """Consumer class for 'fanout'""" + + def __init__(self, channel, topic, callback, tag, **kwargs): + """Init a 'fanout' queue. + + 'channel' is the amqp channel to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + unique = uuid.uuid4().hex + exchange_name = '%s_fanout' % topic + queue_name = '%s_fanout_%s' % (topic, unique) + + # Default options + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + exchange = kombu.entity.Exchange( + name=exchange_name, + type='fanout', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(FanoutConsumer, self).__init__( + channel, + callback, + tag, + name=queue_name, + exchange=exchange, + routing_key=topic, + **options) + + +class Publisher(object): + """Base Publisher class""" + + def __init__(self, channel, exchange_name, routing_key, **kwargs): + """Init the Publisher class with the exchange_name, routing_key, + and other options + """ + self.exchange_name = exchange_name + self.routing_key = routing_key + self.kwargs = kwargs + self.reconnect(channel) + + def reconnect(self, channel): + """Re-establish the Producer after a rabbit reconnection""" + self.exchange = kombu.entity.Exchange(name=self.exchange_name, + **self.kwargs) + self.producer = kombu.messaging.Producer(exchange=self.exchange, + channel=channel, routing_key=self.routing_key) + + def send(self, msg): + """Send a message""" + LOG.info("send %s" % self.producer) + LOG.info("%s %s %s" % (self.exchange_name,self.routing_key,self.kwargs)) + self.producer.publish(msg) + + +class DirectPublisher(Publisher): + """Publisher class for 'direct'""" + def __init__(self, channel, msg_id, **kwargs): + """init a 'direct' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + super(DirectPublisher, self).__init__(channel, + msg_id, + msg_id, + type='direct', + **options) + + +class TopicPublisher(Publisher): + """Publisher class for 'topic'""" + def __init__(self, channel, topic, **kwargs): + """init a 'topic' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + options = {'durable': config.Config.get('rabbit_durable_queues', False), + 'auto_delete': False, + 'exclusive': False} + options.update(kwargs) + super(TopicPublisher, self).__init__(channel, + config.Config.get('control_exchange', 'nova'), + topic, + type='topic', + **options) + + +class FanoutPublisher(Publisher): + """Publisher class for 'fanout'""" + def __init__(self, channel, topic, **kwargs): + """init a 'fanout' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + super(FanoutPublisher, self).__init__(channel, + '%s_fanout' % topic, + None, + type='fanout', + **options) + + +class NotifyPublisher(TopicPublisher): + """Publisher class for 'notify'""" + + def __init__(self, *args, **kwargs): + self.durable = kwargs.pop('durable', config.Config.get('rabbit_durable_queues', False)) + super(NotifyPublisher, self).__init__(*args, **kwargs) + + def reconnect(self, channel): + super(NotifyPublisher, self).reconnect(channel) + + # NOTE(jerdfelt): Normally the consumer would create the queue, but + # we do this to ensure that messages don't get dropped if the + # consumer is started after we do + queue = kombu.entity.Queue(channel=channel, + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key) + queue.declare() + + +class Connection(object): + """Connection object.""" + + def __init__(self, server_params=None): + self.consumers = [] + self.consumer_thread = None + self.max_retries = config.Config.get('rabbit_max_retries', 0) + # Try forever? + if self.max_retries <= 0: + self.max_retries = None + self.interval_start = config.Config.get('rabbit_retry_interval',1) + self.interval_stepping = config.Config.get('rabbit_retry_backoff', 2) + # max retry-interval = 30 seconds + self.interval_max = 30 + self.memory_transport = False + + if server_params is None: + server_params = {} + + # Keys to translate from server_params to kombu params + server_params_to_kombu_params = {'username': 'userid'} + + params = {} + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value + + params.setdefault('hostname', config.Config.get('rabbit_host','localhost')) + params.setdefault('port', config.Config.get('rabbit_port',5672)) + params.setdefault('userid', config.Config.get('rabbit_userid','guest')) + params.setdefault('password', config.Config.get('rabbit_password','f7999d1955c5014aa32c')) + params.setdefault('virtual_host', config.Config.get('rabbit_virtual_host','/')) + + self.params = params + + if config.Config.get('fake_rabbit',False): + self.params['transport'] = 'memory' + self.memory_transport = True + else: + self.memory_transport = False + + if config.Config.get('rabbit_use_ssl', False): + self.params['ssl'] = self._fetch_ssl_params() + + self.connection = None + self.reconnect() + + def _fetch_ssl_params(self): + """Handles fetching what ssl params + should be used for the connection (if any)""" + ssl_params = dict() + + # http://docs.python.org/library/ssl.html - ssl.wrap_socket + if config.Config.get('kombu_ssl_version'): + ssl_params['ssl_version'] = config.Config.get('kombu_ssl_version') + if config.Config.get('kombu_ssl_keyfile'): + ssl_params['keyfile'] = config.Config.get('kombu_ssl_keyfile') + if config.Config.get('kombu_ssl_certfile'): + ssl_params['certfile'] = config.Config.get('kombu_ssl_certfile') + if config.Config.get('kombu_ssl_ca_certs'): + ssl_params['ca_certs'] = config.Config.get('kombu_ssl_ca_certs') + # We might want to allow variations in the + # future with this? + ssl_params['cert_reqs'] = ssl.CERT_REQUIRED + + if not ssl_params: + # Just have the default behavior + return True + else: + # Return the extended behavior + return ssl_params + + def _connect(self): + """Connect to rabbit. Re-establish any queues that may have + been declared before if we are reconnecting. Exceptions should + be handled by the caller. + """ + if self.connection: + LOG.info(_("Reconnecting to AMQP server on " + "%(hostname)s:%(port)d") % self.params) + try: + self.connection.close() + except self.connection_errors: + pass + # Setting this in case the next statement fails, though + # it shouldn't be doing any network operations, yet. + self.connection = None + self.connection = kombu.connection.BrokerConnection( + **self.params) + self.connection_errors = self.connection.connection_errors + if self.memory_transport: + # Kludge to speed up tests. + self.connection.transport.polling_interval = 0.0 + self.consumer_num = itertools.count(1) + self.connection.connect() + self.channel = self.connection.channel() + # work around 'memory' transport bug in 1.1.3 + if self.memory_transport: + self.channel._new_queue('ae.undeliver') + for consumer in self.consumers: + consumer.reconnect(self.channel) + LOG.info(_('Connected to AMQP server on ' + '%(hostname)s:%(port)d' % self.params)) + + def reconnect(self): + """Handles reconnecting and re-establishing queues. + Will retry up to self.max_retries number of times. + self.max_retries = 0 means to retry forever. + Sleep between tries, starting at self.interval_start + seconds, backing off self.interval_stepping number of seconds + each attempt. + """ + + attempt = 0 + while True: + attempt += 1 + try: + self._connect() + return + except self.connection_errors, e: + pass + except Exception, e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + + log_info = {} + log_info['err_str'] = str(e) + log_info['max_retries'] = self.max_retries + log_info.update(self.params) + + if self.max_retries and attempt == self.max_retries: + LOG.exception(_('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) + # NOTE(comstud): Copied from original code. There's + # really no better recourse because if this was a queue we + # need to consume on, we have no way to consume anymore. + sys.exit(1) + + if attempt == 1: + sleep_time = self.interval_start or 1 + elif attempt > 1: + sleep_time += self.interval_stepping + if self.interval_max: + sleep_time = min(sleep_time, self.interval_max) + + log_info['sleep_time'] = sleep_time + LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' + ' unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) + time.sleep(sleep_time) + + def ensure(self, error_callback, method, *args, **kwargs): + while True: + try: + return method(*args, **kwargs) + except (self.connection_errors, socket.timeout), e: + pass + except Exception, e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + if error_callback: + error_callback(e) + self.reconnect() + + def get_channel(self): + """Convenience call for bin/clear_rabbit_queues""" + return self.channel + + def close(self): + """Close/release this connection""" + self.cancel_consumer_thread() + self.connection.release() + self.connection = None + + def reset(self): + """Reset a connection so it can be used again""" + self.cancel_consumer_thread() + self.channel.close() + self.channel = self.connection.channel() + # work around 'memory' transport bug in 1.1.3 + if self.memory_transport: + self.channel._new_queue('ae.undeliver') + self.consumers = [] + + def declare_consumer(self, consumer_cls, topic, callback): + """Create a Consumer using the class that was passed in and + add it to our list of consumers + """ + + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + "%(err_str)s") % log_info) + + def _declare_consumer(): + consumer = consumer_cls(self.channel, topic, callback, + self.consumer_num.next()) + self.consumers.append(consumer) + return consumer + + return self.ensure(_connect_error, _declare_consumer) + + def iterconsume(self, limit=None, timeout=None): + """Return an iterator that will consume from all queues/consumers""" + + info = {'do_consume': True} + + def _error_callback(exc): + if isinstance(exc, socket.timeout): + LOG.exception(_('Timed out waiting for RPC response: %s') % + str(exc)) + raise rpc_common.Timeout() + else: + LOG.exception(_('Failed to consume message from queue: %s') % + str(exc)) + info['do_consume'] = True + + def _consume(): + if info['do_consume']: + queues_head = self.consumers[:-1] + queues_tail = self.consumers[-1] + for queue in queues_head: + queue.consume(nowait=True) + queues_tail.consume(nowait=False) + info['do_consume'] = False + return self.connection.drain_events(timeout=timeout) + + for iteration in itertools.count(0): + if limit and iteration >= limit: + raise StopIteration + yield self.ensure(_error_callback, _consume) + + def cancel_consumer_thread(self): + """Cancel a consumer thread""" + if self.consumer_thread is not None: + self.consumer_thread.kill() + try: + self.consumer_thread.wait() + except greenlet.GreenletExit: + pass + self.consumer_thread = None + + def publisher_send(self, cls, topic, msg, **kwargs): + """Send to a publisher based on the publisher class""" + + def _error_callback(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.exception(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) + + def _publish(): + publisher = cls(self.channel, topic, **kwargs) + LOG.info("_publish info%s %s %s %s" % (self.channel, topic, kwargs,publisher)) + publisher.send(msg) + + self.ensure(_error_callback, _publish) + + def declare_direct_consumer(self, topic, callback): + """Create a 'direct' queue. + In nova's use, this is generally a msg_id queue used for + responses for call/multicall + """ + self.declare_consumer(DirectConsumer, topic, callback) + + def declare_topic_consumer(self, topic, callback=None): + """Create a 'topic' consumer.""" + self.declare_consumer(TopicConsumer, topic, callback) + + def declare_fanout_consumer(self, topic, callback): + """Create a 'fanout' consumer""" + self.declare_consumer(FanoutConsumer, topic, callback) + + def direct_send(self, msg_id, msg): + """Send a 'direct' message""" + self.publisher_send(DirectPublisher, msg_id, msg) + + def topic_send(self, topic, msg): + """Send a 'topic' message""" + self.publisher_send(TopicPublisher, topic, msg) + + def fanout_send(self, topic, msg): + """Send a 'fanout' message""" + self.publisher_send(FanoutPublisher, topic, msg) + + def notify_send(self, topic, msg, **kwargs): + """Send a notify message on a topic""" + self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + + def consume(self, limit=None): + """Consume from all queues/consumers""" + it = self.iterconsume(limit=limit) + while True: + try: + it.next() + except StopIteration: + return + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread""" + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return + if self.consumer_thread is None: + self.consumer_thread = eventlet.spawn(_consumer_thread) + return self.consumer_thread + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls a method in a proxy object""" + if fanout: + self.declare_fanout_consumer(topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) + else: + self.declare_topic_consumer(topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) + + +Connection.pool = rpc_amqp.Pool(connection_cls=Connection) + + +def create_connection(new=True): + """Create a connection""" + return rpc_amqp.create_connection(new, Connection.pool) + + +def multicall(context, topic, msg, timeout=None): + """Make a call that returns multiple times.""" + return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) + + +def call(context, topic, msg, timeout=None): + """Sends a message on a topic and wait for a response.""" + return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) + + +def cast(context, topic, msg): + """Sends a message on a topic without waiting for a response.""" + return rpc_amqp.cast(context, topic, msg, Connection.pool) + + +def fanout_cast(context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response.""" + return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) + + +def cast_to_server(context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def notify(context, topic, msg): + """Sends a notification event on a topic.""" + return rpc_amqp.notify(context, topic, msg, Connection.pool) + + +def cleanup(): + return rpc_amqp.cleanup(Connection.pool) diff --git a/reddwarf/rpc/impl_qpid.py b/reddwarf/rpc/impl_qpid.py new file mode 100644 index 0000000000..2e71d470d4 --- /dev/null +++ b/reddwarf/rpc/impl_qpid.py @@ -0,0 +1,548 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# Copyright 2011 - 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools +import time +import uuid +import json + +import eventlet +import greenlet +import qpid.messaging +import qpid.messaging.exceptions + +from nova import flags +from nova.openstack.common import cfg +from nova.rpc import amqp as rpc_amqp +from nova.rpc import common as rpc_common +from nova.rpc.common import LOG + + +qpid_opts = [ + cfg.StrOpt('qpid_hostname', + default='localhost', + help='Qpid broker hostname'), + cfg.StrOpt('qpid_port', + default='5672', + help='Qpid broker port'), + cfg.StrOpt('qpid_username', + default='', + help='Username for qpid connection'), + cfg.StrOpt('qpid_password', + default='', + help='Password for qpid connection'), + cfg.StrOpt('qpid_sasl_mechanisms', + default='', + help='Space separated list of SASL mechanisms to use for auth'), + cfg.BoolOpt('qpid_reconnect', + default=True, + help='Automatically reconnect'), + cfg.IntOpt('qpid_reconnect_timeout', + default=0, + help='Reconnection timeout in seconds'), + cfg.IntOpt('qpid_reconnect_limit', + default=0, + help='Max reconnections before giving up'), + cfg.IntOpt('qpid_reconnect_interval_min', + default=0, + help='Minimum seconds between reconnection attempts'), + cfg.IntOpt('qpid_reconnect_interval_max', + default=0, + help='Maximum seconds between reconnection attempts'), + cfg.IntOpt('qpid_reconnect_interval', + default=0, + help='Equivalent to setting max and min to the same value'), + cfg.IntOpt('qpid_heartbeat', + default=5, + help='Seconds between connection keepalive heartbeats'), + cfg.StrOpt('qpid_protocol', + default='tcp', + help="Transport to use, either 'tcp' or 'ssl'"), + cfg.BoolOpt('qpid_tcp_nodelay', + default=True, + help='Disable Nagle algorithm'), + ] + +FLAGS = flags.FLAGS +FLAGS.register_opts(qpid_opts) + + +class ConsumerBase(object): + """Consumer base class.""" + + def __init__(self, session, callback, node_name, node_opts, + link_name, link_opts): + """Declare a queue on an amqp session. + + 'session' is the amqp session to use + 'callback' is the callback to call when messages are received + 'node_name' is the first part of the Qpid address string, before ';' + 'node_opts' will be applied to the "x-declare" section of "node" + in the address string. + 'link_name' goes into the "name" field of the "link" in the address + string + 'link_opts' will be applied to the "x-declare" section of "link" + in the address string. + """ + self.callback = callback + self.receiver = None + self.session = None + + addr_opts = { + "create": "always", + "node": { + "type": "topic", + "x-declare": { + "durable": True, + "auto-delete": True, + }, + }, + "link": { + "name": link_name, + "durable": True, + "x-declare": { + "durable": False, + "auto-delete": True, + "exclusive": False, + }, + }, + } + addr_opts["node"]["x-declare"].update(node_opts) + addr_opts["link"]["x-declare"].update(link_opts) + + self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + + self.reconnect(session) + + def reconnect(self, session): + """Re-declare the receiver after a qpid reconnect""" + self.session = session + self.receiver = session.receiver(self.address) + self.receiver.capacity = 1 + + def consume(self): + """Fetch the message and pass it to the callback object""" + message = self.receiver.fetch() + self.callback(message.content) + + def get_receiver(self): + return self.receiver + + +class DirectConsumer(ConsumerBase): + """Queue/consumer class for 'direct'""" + + def __init__(self, session, msg_id, callback): + """Init a 'direct' queue. + + 'session' is the amqp session to use + 'msg_id' is the msg_id to listen on + 'callback' is the callback to call when messages are received + """ + + super(DirectConsumer, self).__init__(session, callback, + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + {"exclusive": True}) + + +class TopicConsumer(ConsumerBase): + """Consumer class for 'topic'""" + + def __init__(self, session, topic, callback): + """Init a 'topic' queue. + + 'session' is the amqp session to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + """ + + super(TopicConsumer, self).__init__(session, callback, + "%s/%s" % (FLAGS.control_exchange, topic), {}, + topic, {}) + + +class FanoutConsumer(ConsumerBase): + """Consumer class for 'fanout'""" + + def __init__(self, session, topic, callback): + """Init a 'fanout' queue. + + 'session' is the amqp session to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + """ + + super(FanoutConsumer, self).__init__(session, callback, + "%s_fanout" % topic, + {"durable": False, "type": "fanout"}, + "%s_fanout_%s" % (topic, uuid.uuid4().hex), + {"exclusive": True}) + + +class Publisher(object): + """Base Publisher class""" + + def __init__(self, session, node_name, node_opts=None): + """Init the Publisher class with the exchange_name, routing_key, + and other options + """ + self.sender = None + self.session = session + + addr_opts = { + "create": "always", + "node": { + "type": "topic", + "x-declare": { + "durable": False, + # auto-delete isn't implemented for exchanges in qpid, + # but put in here anyway + "auto-delete": True, + }, + }, + } + if node_opts: + addr_opts["node"]["x-declare"].update(node_opts) + + self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + + self.reconnect(session) + + def reconnect(self, session): + """Re-establish the Sender after a reconnection""" + self.sender = session.sender(self.address) + + def send(self, msg): + """Send a message""" + self.sender.send(msg) + + +class DirectPublisher(Publisher): + """Publisher class for 'direct'""" + def __init__(self, session, msg_id): + """Init a 'direct' publisher.""" + super(DirectPublisher, self).__init__(session, msg_id, + {"type": "Direct"}) + + +class TopicPublisher(Publisher): + """Publisher class for 'topic'""" + def __init__(self, session, topic): + """init a 'topic' publisher. + """ + super(TopicPublisher, self).__init__(session, + "%s/%s" % (FLAGS.control_exchange, topic)) + + +class FanoutPublisher(Publisher): + """Publisher class for 'fanout'""" + def __init__(self, session, topic): + """init a 'fanout' publisher. + """ + super(FanoutPublisher, self).__init__(session, + "%s_fanout" % topic, {"type": "fanout"}) + + +class NotifyPublisher(Publisher): + """Publisher class for notifications""" + def __init__(self, session, topic): + """init a 'topic' publisher. + """ + super(NotifyPublisher, self).__init__(session, + "%s/%s" % (FLAGS.control_exchange, topic), + {"durable": True}) + + +class Connection(object): + """Connection object.""" + + def __init__(self, server_params=None): + self.session = None + self.consumers = {} + self.consumer_thread = None + + if server_params is None: + server_params = {} + + default_params = dict(hostname=FLAGS.qpid_hostname, + port=FLAGS.qpid_port, + username=FLAGS.qpid_username, + password=FLAGS.qpid_password) + + params = server_params + for key in default_params.keys(): + params.setdefault(key, default_params[key]) + + self.broker = params['hostname'] + ":" + str(params['port']) + # Create the connection - this does not open the connection + self.connection = qpid.messaging.Connection(self.broker) + + # Check if flags are set and if so set them for the connection + # before we call open + self.connection.username = params['username'] + self.connection.password = params['password'] + self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms + self.connection.reconnect = FLAGS.qpid_reconnect + if FLAGS.qpid_reconnect_timeout: + self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout + if FLAGS.qpid_reconnect_limit: + self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit + if FLAGS.qpid_reconnect_interval_max: + self.connection.reconnect_interval_max = ( + FLAGS.qpid_reconnect_interval_max) + if FLAGS.qpid_reconnect_interval_min: + self.connection.reconnect_interval_min = ( + FLAGS.qpid_reconnect_interval_min) + if FLAGS.qpid_reconnect_interval: + self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval + self.connection.hearbeat = FLAGS.qpid_heartbeat + self.connection.protocol = FLAGS.qpid_protocol + self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay + + # Open is part of reconnect - + # NOTE(WGH) not sure we need this with the reconnect flags + self.reconnect() + + def _register_consumer(self, consumer): + self.consumers[str(consumer.get_receiver())] = consumer + + def _lookup_consumer(self, receiver): + return self.consumers[str(receiver)] + + def reconnect(self): + """Handles reconnecting and re-establishing sessions and queues""" + if self.connection.opened(): + try: + self.connection.close() + except qpid.messaging.exceptions.ConnectionError: + pass + + while True: + try: + self.connection.open() + except qpid.messaging.exceptions.ConnectionError, e: + LOG.error(_('Unable to connect to AMQP server: %s ' % str(e))) + time.sleep(FLAGS.qpid_reconnect_interval or 1) + else: + break + + LOG.info(_('Connected to AMQP server on %s' % self.broker)) + + self.session = self.connection.session() + + for consumer in self.consumers.itervalues(): + consumer.reconnect(self.session) + + if self.consumers: + LOG.debug(_("Re-established AMQP queues")) + + def ensure(self, error_callback, method, *args, **kwargs): + while True: + try: + return method(*args, **kwargs) + except (qpid.messaging.exceptions.Empty, + qpid.messaging.exceptions.ConnectionError), e: + if error_callback: + error_callback(e) + self.reconnect() + + def close(self): + """Close/release this connection""" + self.cancel_consumer_thread() + self.connection.close() + self.connection = None + + def reset(self): + """Reset a connection so it can be used again""" + self.cancel_consumer_thread() + self.session.close() + self.session = self.connection.session() + self.consumers = {} + + def declare_consumer(self, consumer_cls, topic, callback): + """Create a Consumer using the class that was passed in and + add it to our list of consumers + """ + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + "%(err_str)s") % log_info) + + def _declare_consumer(): + consumer = consumer_cls(self.session, topic, callback) + self._register_consumer(consumer) + return consumer + + return self.ensure(_connect_error, _declare_consumer) + + def iterconsume(self, limit=None, timeout=None): + """Return an iterator that will consume from all queues/consumers""" + + def _error_callback(exc): + if isinstance(exc, qpid.messaging.exceptions.Empty): + LOG.exception(_('Timed out waiting for RPC response: %s') % + str(exc)) + raise rpc_common.Timeout() + else: + LOG.exception(_('Failed to consume message from queue: %s') % + str(exc)) + + def _consume(): + nxt_receiver = self.session.next_receiver(timeout=timeout) + self._lookup_consumer(nxt_receiver).consume() + + for iteration in itertools.count(0): + if limit and iteration >= limit: + raise StopIteration + yield self.ensure(_error_callback, _consume) + + def cancel_consumer_thread(self): + """Cancel a consumer thread""" + if self.consumer_thread is not None: + self.consumer_thread.kill() + try: + self.consumer_thread.wait() + except greenlet.GreenletExit: + pass + self.consumer_thread = None + + def publisher_send(self, cls, topic, msg): + """Send to a publisher based on the publisher class""" + + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.exception(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) + + def _publisher_send(): + publisher = cls(self.session, topic) + publisher.send(msg) + + return self.ensure(_connect_error, _publisher_send) + + def declare_direct_consumer(self, topic, callback): + """Create a 'direct' queue. + In nova's use, this is generally a msg_id queue used for + responses for call/multicall + """ + self.declare_consumer(DirectConsumer, topic, callback) + + def declare_topic_consumer(self, topic, callback=None): + """Create a 'topic' consumer.""" + self.declare_consumer(TopicConsumer, topic, callback) + + def declare_fanout_consumer(self, topic, callback): + """Create a 'fanout' consumer""" + self.declare_consumer(FanoutConsumer, topic, callback) + + def direct_send(self, msg_id, msg): + """Send a 'direct' message""" + self.publisher_send(DirectPublisher, msg_id, msg) + + def topic_send(self, topic, msg): + """Send a 'topic' message""" + self.publisher_send(TopicPublisher, topic, msg) + + def fanout_send(self, topic, msg): + """Send a 'fanout' message""" + self.publisher_send(FanoutPublisher, topic, msg) + + def notify_send(self, topic, msg, **kwargs): + """Send a notify message on a topic""" + self.publisher_send(NotifyPublisher, topic, msg) + + def consume(self, limit=None): + """Consume from all queues/consumers""" + it = self.iterconsume(limit=limit) + while True: + try: + it.next() + except StopIteration: + return + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread""" + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return + if self.consumer_thread is None: + self.consumer_thread = eventlet.spawn(_consumer_thread) + return self.consumer_thread + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls a method in a proxy object""" + if fanout: + consumer = FanoutConsumer(self.session, topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) + else: + consumer = TopicConsumer(self.session, topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) + self._register_consumer(consumer) + return consumer + + +Connection.pool = rpc_amqp.Pool(connection_cls=Connection) + + +def create_connection(new=True): + """Create a connection""" + return rpc_amqp.create_connection(new, Connection.pool) + + +def multicall(context, topic, msg, timeout=None): + """Make a call that returns multiple times.""" + return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) + + +def call(context, topic, msg, timeout=None): + """Sends a message on a topic and wait for a response.""" + return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) + + +def cast(context, topic, msg): + """Sends a message on a topic without waiting for a response.""" + return rpc_amqp.cast(context, topic, msg, Connection.pool) + + +def fanout_cast(context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response.""" + return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) + + +def cast_to_server(context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.fanout_cast_to_server(context, server_params, topic, + msg, Connection.pool) + + +def notify(context, topic, msg): + """Sends a notification event on a topic.""" + return rpc_amqp.notify(context, topic, msg, Connection.pool) + + +def cleanup(): + return rpc_amqp.cleanup(Connection.pool)