Port to oslo.messaging

Move from oslo RPC to oslo.messaging.

Implements: blueprint oslo-messaging
Co-Authored-By: sdake@redhat.com
Change-Id: I2d222c248dd2cd405b8ec35c4c8198ed001fb69f
This commit is contained in:
Thomas Herve 2014-06-11 06:15:38 +02:00
parent 91fddd5c45
commit 9090b988e6
55 changed files with 1344 additions and 7385 deletions

View File

@ -33,6 +33,7 @@ if os.path.exists(os.path.join(possible_topdir, 'heat', '__init__.py')):
from oslo.config import cfg
from heat.common import config
from heat.common import messaging
from heat.common import notify
from heat.common import wsgi
from heat.openstack.common import gettextutils
@ -52,6 +53,7 @@ if __name__ == '__main__':
'eventlet.wsgi.server=WARN',
]
logging.setup('heat')
messaging.setup()
app = config.load_paste_app()

View File

@ -35,6 +35,7 @@ if os.path.exists(os.path.join(possible_topdir, 'heat', '__init__.py')):
from oslo.config import cfg
from heat.common import config
from heat.common import messaging
from heat.common import notify
from heat.common import wsgi
from heat.openstack.common import gettextutils
@ -54,6 +55,7 @@ if __name__ == '__main__':
'eventlet.wsgi.server=WARN',
]
logging.setup('heat')
messaging.setup()
app = config.load_paste_app()

View File

@ -35,6 +35,7 @@ if os.path.exists(os.path.join(possible_topdir, 'heat', '__init__.py')):
from oslo.config import cfg
from heat.common import config
from heat.common import messaging
from heat.common import notify
from heat.common import wsgi
from heat.openstack.common import gettextutils
@ -54,6 +55,7 @@ if __name__ == '__main__':
'eventlet.wsgi.server=WARN',
]
logging.setup('heat')
messaging.setup()
app = config.load_paste_app()

View File

@ -34,6 +34,7 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'heat', '__init__.py')):
from oslo.config import cfg
from heat.common import messaging
from heat.common import notify
from heat.openstack.common import gettextutils
from heat.openstack.common import log as logging
@ -55,6 +56,7 @@ if __name__ == '__main__':
'eventlet.wsgi.server=WARN',
]
logging.setup('heat')
messaging.setup()
from heat.engine import service as engine

View File

@ -139,6 +139,200 @@
#max_json_body_size=1048576
#
# Options defined in oslo.messaging
#
# Use durable queues in amqp. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
#amqp_durable_queues=false
# Auto-delete queues in amqp. (boolean value)
#amqp_auto_delete=false
# Size of RPC connection pool. (integer value)
#rpc_conn_pool_size=30
# Modules of exceptions that are permitted to be recreated
# upon receiving exception data from an rpc call. (list value)
#allowed_rpc_exception_modules=oslo.messaging.exceptions,nova.exception,cinder.exception,exceptions
# Qpid broker hostname. (string value)
#qpid_hostname=localhost
# Qpid broker port. (integer value)
#qpid_port=5672
# Qpid HA cluster host:port pairs. (list value)
#qpid_hosts=$qpid_hostname:$qpid_port
# Username for Qpid connection. (string value)
#qpid_username=
# Password for Qpid connection. (string value)
#qpid_password=
# Space separated list of SASL mechanisms to use for auth.
# (string value)
#qpid_sasl_mechanisms=
# Seconds between connection keepalive heartbeats. (integer
# value)
#qpid_heartbeat=60
# Transport to use, either 'tcp' or 'ssl'. (string value)
#qpid_protocol=tcp
# Whether to disable the Nagle algorithm. (boolean value)
#qpid_tcp_nodelay=true
# The qpid topology version to use. Version 1 is what was
# originally used by impl_qpid. Version 2 includes some
# backwards-incompatible changes that allow broker federation
# to work. Users should update to version 2 when they are
# able to take everything down, as it requires a clean break.
# (integer value)
#qpid_topology_version=1
# SSL version to use (valid only if SSL enabled). valid values
# are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some
# distributions. (string value)
#kombu_ssl_version=
# SSL key file (valid only if SSL enabled). (string value)
#kombu_ssl_keyfile=
# SSL cert file (valid only if SSL enabled). (string value)
#kombu_ssl_certfile=
# SSL certification authority file (valid only if SSL
# enabled). (string value)
#kombu_ssl_ca_certs=
# How long to wait before reconnecting in response to an AMQP
# consumer cancel notification. (floating point value)
#kombu_reconnect_delay=1.0
# The RabbitMQ broker address where a single node is used.
# (string value)
#rabbit_host=localhost
# The RabbitMQ broker port where a single node is used.
# (integer value)
#rabbit_port=5672
# RabbitMQ HA cluster host:port pairs. (list value)
#rabbit_hosts=$rabbit_host:$rabbit_port
# Connect over SSL for RabbitMQ. (boolean value)
#rabbit_use_ssl=false
# The RabbitMQ userid. (string value)
#rabbit_userid=guest
# The RabbitMQ password. (string value)
#rabbit_password=guest
# the RabbitMQ login method (string value)
#rabbit_login_method=AMQPLAIN
# The RabbitMQ virtual host. (string value)
#rabbit_virtual_host=/
# How frequently to retry connecting with RabbitMQ. (integer
# value)
#rabbit_retry_interval=1
# How long to backoff for between retries when connecting to
# RabbitMQ. (integer value)
#rabbit_retry_backoff=2
# Maximum number of RabbitMQ connection retries. Default is 0
# (infinite retry count). (integer value)
#rabbit_max_retries=0
# Use HA queues in RabbitMQ (x-ha-policy: all). If you change
# this option, you must wipe the RabbitMQ database. (boolean
# value)
#rabbit_ha_queues=false
# If passed, use a fake RabbitMQ provider. (boolean value)
#fake_rabbit=false
# ZeroMQ bind address. Should be a wildcard (*), an ethernet
# interface, or IP. The "host" option should point or resolve
# to this address. (string value)
#rpc_zmq_bind_address=*
# MatchMaker driver. (string value)
#rpc_zmq_matchmaker=oslo.messaging._drivers.matchmaker.MatchMakerLocalhost
# ZeroMQ receiver listening port. (integer value)
#rpc_zmq_port=9501
# Number of ZeroMQ contexts, defaults to 1. (integer value)
#rpc_zmq_contexts=1
# Maximum number of ingress messages to locally buffer per
# topic. Default is unlimited. (integer value)
#rpc_zmq_topic_backlog=<None>
# Directory for holding IPC sockets. (string value)
#rpc_zmq_ipc_dir=/var/run/openstack
# Name of this node. Must be a valid hostname, FQDN, or IP
# address. Must match "host" option, if running Nova. (string
# value)
#rpc_zmq_host=heat
# Seconds to wait before a cast expires (TTL). Only supported
# by impl_zmq. (integer value)
#rpc_cast_timeout=30
# Heartbeat frequency. (integer value)
#matchmaker_heartbeat_freq=300
# Heartbeat time-to-live. (integer value)
#matchmaker_heartbeat_ttl=600
# Host to locate redis. (string value)
#host=127.0.0.1
# Use this port to connect to redis host. (integer value)
#port=6379
# Password for Redis server (optional). (string value)
#password=<None>
# Size of RPC greenthread pool. (integer value)
#rpc_thread_pool_size=64
# Driver or drivers to handle sending notifications. (multi
# valued)
#notification_driver=
# AMQP topic used for OpenStack notifications. (list value)
# Deprecated group/name - [rpc_notifier2]/topics
#notification_topics=notifications
# Seconds to wait for a response from a call. (integer value)
#rpc_response_timeout=60
# A URL representing the messaging driver to use and its full
# configuration. If not set, we fall back to the rpc_backend
# option and driver specific configuration. (string value)
#transport_url=<None>
# The messaging driver to use, defaults to rabbit. Other
# drivers include qpid and zmq. (string value)
#rpc_backend=rabbit
# The default exchange under which topics are scoped. May be
# overridden by an exchange name specified in the
# transport_url option. (string value)
#control_exchange=openstack
#
# Options defined in heat.api.middleware.ssl
#
@ -176,6 +370,23 @@
#cloud_backend=heat.engine.clients.OpenStackClients
#
# Options defined in heat.engine.notification
#
# Default notification level for outgoing notifications
# (string value)
#default_notification_level=INFO
# Default publisher_id for outgoing notifications (string
# value)
#default_publisher_id=<None>
# List of drivers to send notifications (DEPRECATED) (multi
# valued)
#list_notifier_drivers=<None>
#
# Options defined in heat.engine.resources.loadbalancer
#
@ -305,39 +516,6 @@
#syslog_log_facility=LOG_USER
#
# Options defined in heat.openstack.common.notifier.api
#
# Driver or drivers to handle sending notifications (multi
# valued)
#notification_driver=
# Default notification level for outgoing notifications
# (string value)
#default_notification_level=INFO
# Default publisher_id for outgoing notifications (string
# value)
#default_publisher_id=<None>
#
# Options defined in heat.openstack.common.notifier.list_notifier
#
# List of drivers to send notifications (multi valued)
#list_notifier_drivers=heat.openstack.common.notifier.no_op_notifier
#
# Options defined in heat.openstack.common.notifier.rpc_notifier
#
# AMQP topic used for OpenStack notifications (list value)
#notification_topics=notifications
#
# Options defined in heat.openstack.common.policy
#
@ -350,200 +528,6 @@
#policy_default_rule=default
#
# Options defined in heat.openstack.common.rpc
#
# The messaging module to use, defaults to kombu. (string
# value)
#rpc_backend=heat.openstack.common.rpc.impl_kombu
# Size of RPC thread pool (integer value)
#rpc_thread_pool_size=64
# Size of RPC connection pool (integer value)
#rpc_conn_pool_size=30
# Seconds to wait for a response from call or multicall
# (integer value)
#rpc_response_timeout=60
# Seconds to wait before a cast expires (TTL). Only supported
# by impl_zmq. (integer value)
#rpc_cast_timeout=30
# Modules of exceptions that are permitted to be recreated
# upon receiving exception data from an rpc call. (list value)
#allowed_rpc_exception_modules=nova.exception,cinder.exception,exceptions
# If passed, use a fake RabbitMQ provider (boolean value)
#fake_rabbit=false
# AMQP exchange to connect to if using RabbitMQ or Qpid
# (string value)
#control_exchange=heat
#
# Options defined in heat.openstack.common.rpc.amqp
#
# Use durable queues in amqp. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
#amqp_durable_queues=false
# Auto-delete queues in amqp. (boolean value)
#amqp_auto_delete=false
#
# Options defined in heat.openstack.common.rpc.impl_kombu
#
# If SSL is enabled, the SSL version to use. Valid values are
# TLSv1, SSLv23 and SSLv3. SSLv2 might be available on some
# distributions. (string value)
#kombu_ssl_version=
# SSL key file (valid only if SSL enabled) (string value)
#kombu_ssl_keyfile=
# SSL cert file (valid only if SSL enabled) (string value)
#kombu_ssl_certfile=
# SSL certification authority file (valid only if SSL enabled)
# (string value)
#kombu_ssl_ca_certs=
# How long to wait before reconnecting in response to an AMQP
# consumer cancel notification. (floating point value)
#kombu_reconnect_delay=1.0
# The RabbitMQ broker address where a single node is used
# (string value)
#rabbit_host=localhost
# The RabbitMQ broker port where a single node is used
# (integer value)
#rabbit_port=5672
# RabbitMQ HA cluster host:port pairs (list value)
#rabbit_hosts=$rabbit_host:$rabbit_port
# Connect over SSL for RabbitMQ (boolean value)
#rabbit_use_ssl=false
# The RabbitMQ userid (string value)
#rabbit_userid=guest
# The RabbitMQ password (string value)
#rabbit_password=guest
# The RabbitMQ virtual host (string value)
#rabbit_virtual_host=/
# How frequently to retry connecting with RabbitMQ (integer
# value)
#rabbit_retry_interval=1
# How long to backoff for between retries when connecting to
# RabbitMQ (integer value)
#rabbit_retry_backoff=2
# Maximum number of RabbitMQ connection retries. Default is 0
# (infinite retry count) (integer value)
#rabbit_max_retries=0
# Use HA queues in RabbitMQ (x-ha-policy: all). If you change
# this option, you must wipe the RabbitMQ database. (boolean
# value)
#rabbit_ha_queues=false
#
# Options defined in heat.openstack.common.rpc.impl_qpid
#
# Qpid broker hostname (string value)
#qpid_hostname=localhost
# Qpid broker port (integer value)
#qpid_port=5672
# Qpid HA cluster host:port pairs (list value)
#qpid_hosts=$qpid_hostname:$qpid_port
# Username for qpid connection (string value)
#qpid_username=
# Password for qpid connection (string value)
#qpid_password=
# Space separated list of SASL mechanisms to use for auth
# (string value)
#qpid_sasl_mechanisms=
# Seconds between connection keepalive heartbeats (integer
# value)
#qpid_heartbeat=60
# Transport to use, either 'tcp' or 'ssl' (string value)
#qpid_protocol=tcp
# Disable Nagle algorithm (boolean value)
#qpid_tcp_nodelay=true
# The qpid topology version to use. Version 1 is what was
# originally used by impl_qpid. Version 2 includes some
# backwards-incompatible changes that allow broker federation
# to work. Users should update to version 2 when they are
# able to take everything down, as it requires a clean break.
# (integer value)
#qpid_topology_version=1
#
# Options defined in heat.openstack.common.rpc.impl_zmq
#
# ZeroMQ bind address. Should be a wildcard (*), an ethernet
# interface, or IP. The "host" option should point or resolve
# to this address. (string value)
#rpc_zmq_bind_address=*
# MatchMaker driver (string value)
#rpc_zmq_matchmaker=heat.openstack.common.rpc.matchmaker.MatchMakerLocalhost
# ZeroMQ receiver listening port (integer value)
#rpc_zmq_port=9501
# Number of ZeroMQ contexts, defaults to 1 (integer value)
#rpc_zmq_contexts=1
# Maximum number of ingress messages to locally buffer per
# topic. Default is unlimited. (integer value)
#rpc_zmq_topic_backlog=<None>
# Directory for holding IPC sockets (string value)
#rpc_zmq_ipc_dir=/var/run/openstack
# Name of this node. Must be a valid hostname, FQDN, or IP
# address. Must match "host" option, if running Nova. (string
# value)
#rpc_zmq_host=heat
#
# Options defined in heat.openstack.common.rpc.matchmaker
#
# Heartbeat frequency (integer value)
#matchmaker_heartbeat_freq=300
# Heartbeat time-to-live. (integer value)
#matchmaker_heartbeat_ttl=600
[auth_password]
#
@ -1226,29 +1210,13 @@
#hash_algorithms=md5
[matchmaker_redis]
#
# Options defined in heat.openstack.common.rpc.matchmaker_redis
#
# Host to locate redis (string value)
#host=127.0.0.1
# Use this port to connect to redis host. (integer value)
#port=6379
# Password for Redis server. (optional) (string value)
#password=<None>
[matchmaker_ring]
#
# Options defined in heat.openstack.common.rpc.matchmaker_ring
# Options defined in oslo.messaging
#
# Matchmaker ring file (JSON) (string value)
# Matchmaker ring file (JSON). (string value)
# Deprecated group/name - [DEFAULT]/matchmaker_ringfile
#ringfile=/etc/oslo/matchmaker_ring.json
@ -1279,13 +1247,3 @@
#heat_revision=unknown
[rpc_notifier2]
#
# Options defined in heat.openstack.common.notifier.rpc_notifier2
#
# AMQP topic(s) used for OpenStack notifications (list value)
#topics=notifications

View File

@ -21,7 +21,6 @@ import webob.exc
from heat.common import serializers
from heat.openstack.common.gettextutils import _
from heat.openstack.common.rpc import common as rpc_common
class HeatAPIException(webob.exc.HTTPError):
@ -262,7 +261,7 @@ class HeatActionInProgressError(HeatAPIException):
def map_remote_error(ex):
"""
Map rpc_common.RemoteError exceptions returned by the engine
Map RemoteError exceptions returned by the engine
to HeatAPIException subclasses which can be used to return
properly formatted AWS error responses
"""
@ -290,8 +289,8 @@ def map_remote_error(ex):
ex_type = ex.__class__.__name__
if ex_type.endswith(rpc_common._REMOTE_POSTFIX):
ex_type = ex_type[:-len(rpc_common._REMOTE_POSTFIX)]
if ex_type.endswith('_Remote'):
ex_type = ex_type[:-len('_Remote')]
if ex_type in inval_param_errors:
return HeatInvalidParameterValueError(detail=six.text_type(ex))

View File

@ -14,6 +14,8 @@
"""
endpoint for heat AWS-compatible CloudWatch API
"""
from oslo import messaging
from heat.api.aws import exception
from heat.api.aws import utils as api_utils
from heat.common import exception as heat_exception
@ -21,7 +23,6 @@ from heat.common import policy
from heat.common import wsgi
from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging
from heat.openstack.common.rpc import common as rpc_common
from heat.rpc import api as engine_api
from heat.rpc import client as rpc_client
@ -137,7 +138,7 @@ class WatchController(object):
try:
watch_list = self.rpc_client.show_watch(con, watch_name=name)
except rpc_common.RemoteError as ex:
except messaging.RemoteError as ex:
return exception.map_remote_error(ex)
res = {'MetricAlarms': [format_metric_alarm(a)
@ -229,7 +230,7 @@ class WatchController(object):
'metric_name': None}
watch_data = self.rpc_client.show_watch_metric(con,
**null_kwargs)
except rpc_common.RemoteError as ex:
except messaging.RemoteError as ex:
return exception.map_remote_error(ex)
res = {'Metrics': []}
@ -292,7 +293,7 @@ class WatchController(object):
try:
self.rpc_client.create_watch_data(con, watch_name, data)
except rpc_common.RemoteError as ex:
except messaging.RemoteError as ex:
return exception.map_remote_error(ex)
result = {'ResponseMetadata': None}
@ -329,7 +330,7 @@ class WatchController(object):
try:
self.rpc_client.set_watch_state(con, watch_name=name,
state=state_map[state])
except rpc_common.RemoteError as ex:
except messaging.RemoteError as ex:
return exception.map_remote_error(ex)
return api_utils.format_response("SetAlarmState", "")

View File

@ -25,11 +25,9 @@ import traceback
from oslo.config import cfg
import webob
from heat.common import exception
from heat.common import serializers
from heat.common import wsgi
from heat.openstack.common.rpc import common as rpc_common
cfg.CONF.import_opt('debug', 'heat.openstack.common.log')
@ -108,8 +106,8 @@ class FaultWrapper(wsgi.Middleware):
ex_type = ex.__class__.__name__
if ex_type.endswith(rpc_common._REMOTE_POSTFIX):
ex_type = ex_type[:-len(rpc_common._REMOTE_POSTFIX)]
if ex_type.endswith('_Remote'):
ex_type = ex_type[:-len('_Remote')]
full_message = unicode(ex)
if full_message.find('\n') > -1:

View File

@ -22,7 +22,6 @@ from oslo.config import cfg
from heat.common import wsgi
from heat.openstack.common import log as logging
from heat.openstack.common import rpc
paste_deploy_group = cfg.OptGroup('paste_deploy')
paste_deploy_opts = [
@ -214,20 +213,6 @@ cfg.CONF.register_group(revision_group)
for group, opts in list_opts():
cfg.CONF.register_opts(opts, group=group)
rpc.set_defaults(control_exchange='heat')
# A bit of history:
# This was added initially by jianingy, then it got added
# to oslo by Luis. Then it was receintly removed from the
# default list again.
# I am not sure we can (or should) rely on oslo to keep
# our exceptions class in the defaults list.
allowed_rpc_exception_modules = cfg.CONF.allowed_rpc_exception_modules
allowed_rpc_exception_modules.append('heat.common.exception')
cfg.CONF.set_default(name='allowed_rpc_exception_modules',
default=allowed_rpc_exception_modules)
def _get_deployment_flavor():
"""

118
heat/common/messaging.py Normal file
View File

@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
# Copyright 2013 eNovance <licensing@enovance.com>
#
# Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo.config import cfg
import oslo.messaging
from heat.common import context
from heat.openstack.common import jsonutils
TRANSPORT = None
NOTIFIER = None
_ALIASES = {
'heat.openstack.common.rpc.impl_kombu': 'rabbit',
'heat.openstack.common.rpc.impl_qpid': 'qpid',
'heat.openstack.common.rpc.impl_zmq': 'zmq',
}
class RequestContextSerializer(oslo.messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)
def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)
@staticmethod
def serialize_context(ctxt):
return ctxt.to_dict()
@staticmethod
def deserialize_context(ctxt):
return context.RequestContext.from_dict(ctxt)
class JsonPayloadSerializer(oslo.messaging.NoOpSerializer):
@classmethod
def serialize_entity(cls, context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
def setup(url=None, optional=False):
"""Initialise the oslo.messaging layer."""
global TRANSPORT, NOTIFIER
if url and url.startswith("fake://"):
# NOTE(sileht): oslo.messaging fake driver uses time.sleep
# for task switch, so we need to monkey_patch it
eventlet.monkey_patch(time=True)
if not TRANSPORT:
oslo.messaging.set_transport_defaults('heat')
exmods = ['heat.common.exception']
try:
TRANSPORT = oslo.messaging.get_transport(
cfg.CONF, url, allowed_remote_exmods=exmods, aliases=_ALIASES)
except oslo.messaging.InvalidTransportURL as e:
TRANSPORT = None
if not optional or e.url:
# NOTE(sileht): oslo.messaging is configured but unloadable
# so reraise the exception
raise
if not NOTIFIER and TRANSPORT:
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
"""Cleanup the oslo.messaging layer."""
global TRANSPORT, NOTIFIER
if TRANSPORT:
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def get_rpc_server(target, endpoint):
"""Return a configured oslo.messaging rpc server."""
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet',
serializer=serializer)
def get_rpc_client(**kwargs):
"""Return a configured oslo.messaging RPCClient."""
target = oslo.messaging.Target(**kwargs)
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo.messaging.RPCClient(TRANSPORT, target,
serializer=serializer)
def get_notifier(publisher_id):
"""Return a configured oslo.messaging notifier."""
return NOTIFIER.prepare(publisher_id=publisher_id)

View File

@ -13,22 +13,31 @@
from oslo.config import cfg
from heat.common import messaging
from heat.openstack.common import log
from heat.openstack.common.notifier import api as notifier_api
LOG = log.getLogger(__name__)
SERVICE = 'orchestration'
INFO = 'INFO'
ERROR = 'ERROR'
notifier_opts = [
cfg.StrOpt('default_notification_level',
default=INFO,
help='Default notification level for outgoing notifications'),
cfg.StrOpt('default_publisher_id',
help='Default publisher_id for outgoing notifications'),
cfg.MultiStrOpt('list_notifier_drivers',
help='List of drivers to send notifications (DEPRECATED)')
]
CONF = cfg.CONF
CONF.import_opt('default_notification_level',
'heat.openstack.common.notifier.api')
CONF.import_opt('default_publisher_id',
'heat.openstack.common.notifier.api')
CONF.register_opts(notifier_opts)
def _get_default_publisher():
publisher_id = CONF.default_publisher_id
if publisher_id is None:
publisher_id = notifier_api.publisher_id(SERVICE)
publisher_id = "%s.%s" % (SERVICE, CONF.host)
return publisher_id
@ -37,7 +46,7 @@ def get_default_level():
def notify(context, event_type, level, body):
client = messaging.get_notifier(_get_default_publisher())
notifier_api.notify(context, _get_default_publisher(),
"%s.%s" % (SERVICE, event_type),
level, body)
method = getattr(client, level.lower())
method(context, "%s.%s" % (SERVICE, event_type), body)

View File

@ -13,7 +13,6 @@
from heat.engine import api as engine_api
from heat.engine import notification
from heat.openstack.common.notifier import api as notifier_api
def send(stack,
@ -37,6 +36,6 @@ def send(stack,
level = notification.get_default_level()
if suffix == 'error':
level = notifier_api.ERROR
level = notification.ERROR
notification.notify(stack.context, event_type, level, body)

View File

@ -13,7 +13,6 @@
from heat.engine import api as engine_api
from heat.engine import notification
from heat.openstack.common.notifier import api as notifier_api
def send(stack):
@ -30,7 +29,7 @@ def send(stack):
suffix = 'end'
else:
suffix = 'error'
level = notifier_api.ERROR
level = notification.ERROR
event_type = '%s.%s.%s' % ('stack',
stack.action.lower(),

View File

@ -17,6 +17,7 @@ import json
import os
from oslo.config import cfg
from oslo import messaging
import six
import warnings
import webob
@ -24,6 +25,7 @@ import webob
from heat.common import context
from heat.common import exception
from heat.common import identifier
from heat.common import messaging as rpc_messaging
from heat.db import api as db_api
from heat.engine import api
from heat.engine import attributes
@ -39,9 +41,7 @@ from heat.engine import stack_lock
from heat.engine import watchrule
from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging
from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import proxy
from heat.openstack.common.rpc import service
from heat.openstack.common import service
from heat.openstack.common import threadgroup
from heat.openstack.common import timeutils
from heat.openstack.common import uuidutils
@ -62,7 +62,7 @@ def request_context(func):
try:
return func(self, ctx, *args, **kwargs)
except exception.HeatException:
raise rpc_common.ClientException()
raise messaging.rpc.dispatcher.ExpectedException()
return wrapped
@ -259,11 +259,17 @@ class EngineListener(service.Service):
engines to communicate with each other for multi-engine support.
'''
def __init__(self, host, engine_id, thread_group_mgr):
super(EngineListener, self).__init__(host, engine_id)
super(EngineListener, self).__init__()
self.thread_group_mgr = thread_group_mgr
self.engine_id = engine_id
def start(self):
super(EngineListener, self).start()
self.target = messaging.Target(
server=cfg.CONF.host, topic=self.engine_id)
server = rpc_messaging.get_rpc_server(self.target, self)
server.start()
def listening(self, ctxt):
'''
Respond affirmatively to confirm that the engine performing the
@ -291,9 +297,10 @@ class EngineService(service.Service):
RPC_API_VERSION = '1.1'
def __init__(self, host, topic, manager=None):
super(EngineService, self).__init__(host, topic)
super(EngineService, self).__init__()
resources.initialise()
self.host = host
self.topic = topic
# The following are initialized here, but assigned in start() which
# happens after the fork when spawning multiple worker processes
@ -301,6 +308,7 @@ class EngineService(service.Service):
self.listener = None
self.engine_id = None
self.thread_group_mgr = None
self.target = None
if cfg.CONF.instance_user:
warnings.warn('The "instance_user" option in heat.conf is '
@ -322,13 +330,21 @@ class EngineService(service.Service):
self.stack_watch.start_watch_task(s.id, admin_context)
def start(self):
self.thread_group_mgr = ThreadGroupManager()
self.engine_id = stack_lock.StackLock.generate_engine_id()
self.thread_group_mgr = ThreadGroupManager()
self.listener = EngineListener(self.host, self.engine_id,
self.thread_group_mgr)
LOG.debug("Starting listener for engine %s pid=%s, ppid=%s" %
(self.engine_id, os.getpid(), os.getppid()))
LOG.debug("Starting listener for engine %s" % self.engine_id)
self.listener.start()
target = messaging.Target(
version=self.RPC_API_VERSION, server=cfg.CONF.host,
topic=self.topic)
self.target = target
server = rpc_messaging.get_rpc_server(target, self)
server.start()
self._client = rpc_messaging.get_rpc_client(
version=self.RPC_API_VERSION)
super(EngineService, self).start()
def stop(self):
@ -702,12 +718,16 @@ class EngineService(service.Service):
:param stack_identity: Name of the stack you want to delete.
"""
def remote_stop(lock_engine_id):
rpc = proxy.RpcProxy(lock_engine_id, "1.0")
msg = rpc.make_msg("stop_stack", stack_identity=stack_identity)
timeout = cfg.CONF.engine_life_check_timeout
self.cctxt = self._client.prepare(
version='1.0',
timeout=timeout,
topic=lock_engine_id)
try:
rpc.call(cnxt, msg, topic=lock_engine_id, timeout=timeout)
except rpc_common.Timeout:
self.cctxt.call(cnxt,
'stop_stack',
stack_identity=stack_identity)
except messaging.MessagingTimeout:
return False
st = self._get_stack(cnxt, stack_identity)

View File

@ -15,14 +15,14 @@ import contextlib
import uuid
from oslo.config import cfg
from oslo import messaging
from heat.common import exception
from heat.common import messaging as rpc_messaging
from heat.db import api as db_api
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging
from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import proxy
cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
@ -38,13 +38,12 @@ class StackLock(object):
@staticmethod
def engine_alive(context, engine_id):
topic = engine_id
rpc = proxy.RpcProxy(topic, "1.0")
msg = rpc.make_msg("listening")
client = rpc_messaging.get_rpc_client(version='1.0', topic=engine_id)
client_context = client.prepare(
timeout=cfg.CONF.engine_life_check_timeout)
try:
return rpc.call(context, msg, topic=topic,
timeout=cfg.CONF.engine_life_check_timeout)
except rpc_common.Timeout:
return client_context.call(context, 'listening')
except messaging.MessagingTimeout:
return False
@staticmethod

View File

@ -1,30 +0,0 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo.config import cfg
from heat.openstack.common import notifier
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
if ('heat.openstack.common.notifier.log_notifier' in
cfg.CONF.notification_driver):
return
notifier.api.notify(None, 'error.publisher',
'error_notification',
notifier.api.ERROR,
dict(error=record.getMessage()))

View File

@ -1,172 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import uuid
from oslo.config import cfg
from heat.openstack.common import context
from heat.openstack.common.gettextutils import _, _LE
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common import log as logging
from heat.openstack.common import timeutils
LOG = logging.getLogger(__name__)
notifier_opts = [
cfg.MultiStrOpt('notification_driver',
default=[],
help='Driver or drivers to handle sending notifications'),
cfg.StrOpt('default_notification_level',
default='INFO',
help='Default notification level for outgoing notifications'),
cfg.StrOpt('default_publisher_id',
help='Default publisher_id for outgoing notifications'),
]
CONF = cfg.CONF
CONF.register_opts(notifier_opts)
WARN = 'WARN'
INFO = 'INFO'
ERROR = 'ERROR'
CRITICAL = 'CRITICAL'
DEBUG = 'DEBUG'
log_levels = (DEBUG, WARN, INFO, ERROR, CRITICAL)
class BadPriorityException(Exception):
pass
def notify_decorator(name, fn):
"""Decorator for notify which is used from utils.monkey_patch().
:param name: name of the function
:param function: - object of the function
:returns: function -- decorated function
"""
def wrapped_func(*args, **kwarg):
body = {}
body['args'] = []
body['kwarg'] = {}
for arg in args:
body['args'].append(arg)
for key in kwarg:
body['kwarg'][key] = kwarg[key]
ctxt = context.get_context_from_function_and_args(fn, args, kwarg)
notify(ctxt,
CONF.default_publisher_id or socket.gethostname(),
name,
CONF.default_notification_level,
body)
return fn(*args, **kwarg)
return wrapped_func
def publisher_id(service, host=None):
if not host:
try:
host = CONF.host
except AttributeError:
host = CONF.default_publisher_id or socket.gethostname()
return "%s.%s" % (service, host)
def notify(context, publisher_id, event_type, priority, payload):
"""Sends a notification using the specified driver
:param publisher_id: the source worker_type.host of the message
:param event_type: the literal type of event (ex. Instance Creation)
:param priority: patterned after the enumeration of Python logging
levels in the set (DEBUG, WARN, INFO, ERROR, CRITICAL)
:param payload: A python dictionary of attributes
Outgoing message format includes the above parameters, and appends the
following:
message_id
a UUID representing the id for this notification
timestamp
the GMT timestamp the notification was sent at
The composite message will be constructed as a dictionary of the above
attributes, which will then be sent via the transport mechanism defined
by the driver.
Message example::
{'message_id': str(uuid.uuid4()),
'publisher_id': 'compute.host1',
'timestamp': timeutils.utcnow(),
'priority': 'WARN',
'event_type': 'compute.create_instance',
'payload': {'instance_id': 12, ... }}
"""
if priority not in log_levels:
raise BadPriorityException(
_('%s not in valid priorities') % priority)
# Ensure everything is JSON serializable.
payload = jsonutils.to_primitive(payload, convert_instances=True)
msg = dict(message_id=str(uuid.uuid4()),
publisher_id=publisher_id,
event_type=event_type,
priority=priority,
payload=payload,
timestamp=str(timeutils.utcnow()))
for driver in _get_drivers():
try:
driver.notify(context, msg)
except Exception as e:
LOG.exception(_LE("Problem '%(e)s' attempting to "
"send to notification system. "
"Payload=%(payload)s")
% dict(e=e, payload=payload))
_drivers = None
def _get_drivers():
"""Instantiate, cache, and return drivers based on the CONF."""
global _drivers
if _drivers is None:
_drivers = {}
for notification_driver in CONF.notification_driver:
try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError:
LOG.exception(_LE("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
return _drivers.values()
def _reset_drivers():
"""Used by unit tests to reset the drivers."""
global _drivers
_drivers = None

View File

@ -1,120 +0,0 @@
#
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import log as logging
list_notifier_drivers_opt = cfg.MultiStrOpt(
'list_notifier_drivers',
default=['heat.openstack.common.notifier.no_op_notifier'],
help='List of drivers to send notifications')
CONF = cfg.CONF
CONF.register_opt(list_notifier_drivers_opt)
LOG = logging.getLogger(__name__)
drivers = None
class ImportFailureNotifier(object):
"""Noisily re-raises some exception over-and-over when notify is called."""
def __init__(self, exception):
self.exception = exception
def notify(self, context, message):
raise self.exception
def _get_drivers():
"""Instantiates and returns drivers based on the flag values."""
global drivers
if drivers is None:
drivers = []
for notification_driver in CONF.list_notifier_drivers:
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
return drivers
def add_driver(notification_driver):
"""Add a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
if isinstance(notification_driver, basestring):
# Load and add
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
else:
# Driver is already loaded; just add the object.
drivers.append(notification_driver)
def _object_name(obj):
name = []
if hasattr(obj, '__module__'):
name.append(obj.__module__)
if hasattr(obj, '__name__'):
name.append(obj.__name__)
else:
name.append(obj.__class__.__name__)
return '.'.join(name)
def remove_driver(notification_driver):
"""Remove a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
removed = False
if notification_driver in drivers:
# We're removing an object. Easy.
drivers.remove(notification_driver)
removed = True
else:
# We're removing a driver by name. Search for it.
for driver in drivers:
if _object_name(driver) == notification_driver:
drivers.remove(driver)
removed = True
if not removed:
raise ValueError("Cannot remove; %s is not in list" %
notification_driver)
def notify(context, message):
"""Passes notification to multiple notifiers in a list."""
for driver in _get_drivers():
try:
driver.notify(context, message)
except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to send to "
"notification driver %(driver)s."), locals())
def _reset_drivers():
"""Used by unit tests to reset the drivers."""
global drivers
drivers = None

View File

@ -1,37 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#