Add support for multiple AMQP rpc backends

This is based on Oslo messaging API that supports RPC and notifications over a
number of different messsaging transports.

* remove old powervc.common.messaging and create a new one to adapt to Oslo
messaging

* adapt all sync service managers to new messaging model

Change-Id: I0c9b4a9fa5bb5d0eaac1433e768a110871d8dab8
Closes-Bug: 1363618
This commit is contained in:
Le Tian Ren 2014-09-12 19:46:57 +08:00
parent 05a8443e12
commit 0eefba3f77
12 changed files with 678 additions and 1144 deletions

View File

@ -14,11 +14,16 @@ from cinder.openstack.common import log
from powervc.common import config
from powervc.common.gettextutils import _
from powervc.volume.manager import constants
from powervc.common import messaging
from powervc.volume.driver import service as pvcservice
from powervc.common import utils
from powervc.common.client import delegate as ctx_delegate
from powervc.common import messaging
from oslo.messaging.notify import listener
from oslo.messaging import target
from oslo.messaging import transport
CONF = config.CONF
LOG = log.getLogger(__name__)
@ -142,82 +147,82 @@ class PowerVCCinderManager(service.Service):
self.volume_service.start()
def _create_powervc_listeners(self, ctx):
"""
Listen for out-of-band changes made in PowerVC.
"""Listen for out-of-band changes made in PowerVC.
This method creates the connection to the PowerVC Qpid broker and
This method creates the listner to the PowerVC AMQP broker and
sets up handlers so that any changes made directly in PowerVC are
reflected in the local OS.
:param: ctx The security context
"""
LOG.debug("Enter _create_powervc_listeners method")
# Function to call if we lose the Qpid connection and then get it back
def reconnect_handler():
LOG.debug('Re-established connection to Qpid broker, sync all '
'volume types on next sync interval')
self.full_volume_type_sync_required = True
# Create Qpid connection and listener
LOG.debug("Building connection with AMQP server")
conn = messaging.PowerVCConnection(reconnect_handler=reconnect_handler,
context=ctx,
log=logging)
LOG.debug("Creating message listener to linsten PowerVC event")
listener = conn.create_listener('cinder', 'notifications.info')
trans = transport.get_transport(config.AMQP_POWERVC_CONF)
targets = [
target.Target(exchange='cinder', topic='notifications')
]
endpoint = messaging.NotificationEndpoint(log=LOG, sec_context=ctx)
# Volume type creation
LOG.debug(_("Register event handler for %s event ")
% constants.EVENT_VOLUME_TYPE_CREATE)
listener.register_handler(constants.EVENT_VOLUME_TYPE_CREATE,
endpoint.register_handler(constants.EVENT_VOLUME_TYPE_CREATE,
self._handle_powervc_volume_type_create)
# Volume type deletion
LOG.debug(_("Register event handler for %s event ")
% constants.EVENT_VOLUME_TYPE_DELETE)
listener.register_handler(constants.EVENT_VOLUME_TYPE_DELETE,
endpoint.register_handler(constants.EVENT_VOLUME_TYPE_DELETE,
self._handle_powervc_volume_type_delete)
# Volume type extra spec changes
LOG.debug(_("Register event handler for %s event ")
% constants.EVENT_VOLUME_TYPE_EXTRA_SPECS_UPDATE)
listener.register_handler([
endpoint.register_handler([
constants.EVENT_VOLUME_TYPE_EXTRA_SPECS_UPDATE],
self._handle_powervc_volume_type_extra_spec_update)
LOG.debug(_("Register event handler for %s event ")
% constants.EVENT_VOLUME_CREATE_END)
listener.register_handler([constants.EVENT_VOLUME_CREATE_END],
endpoint.register_handler([constants.EVENT_VOLUME_CREATE_END],
self._handle_powervc_volume_create)
LOG.debug(_("Register event handler for %s event ")
% constants.EVENT_VOLUME_IMPORT_END)
listener.register_handler([constants.EVENT_VOLUME_IMPORT_END],
endpoint.register_handler([constants.EVENT_VOLUME_IMPORT_END],
self._handle_powervc_volume_create)
LOG.debug(_("Register event handler for %s event ")
% constants.EVENT_VOLUME_DELETE_END)
listener.register_handler([constants.EVENT_VOLUME_DELETE_END],
endpoint.register_handler([constants.EVENT_VOLUME_DELETE_END],
self._handle_powervc_volume_delete)
LOG.debug(_("Register event handler for %s event ")
% constants.EVENT_VOLUME_UPDATE)
listener.register_handler([constants.EVENT_VOLUME_UPDATE],
endpoint.register_handler([constants.EVENT_VOLUME_UPDATE],
self._handle_powervc_volume_update)
LOG.debug(_("Register event handler for %s event ")
% constants.EVENT_VOLUME_ATTACH_END)
listener.register_handler([constants.EVENT_VOLUME_ATTACH_END],
endpoint.register_handler([constants.EVENT_VOLUME_ATTACH_END],
self._handle_powervc_volume_update)
LOG.debug(_("Register event handler for %s event ")
% constants.EVENT_VOLUME_DETACH_END)
listener.register_handler([constants.EVENT_VOLUME_DETACH_END],
endpoint.register_handler([constants.EVENT_VOLUME_DETACH_END],
self._handle_powervc_volume_update)
endpoints = [
endpoint,
]
LOG.debug("Starting to listen...... ")
conn.start()
pvc_cinder_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(pvc_cinder_listener)
LOG.debug("Exit _create_powervc_listeners method")
def _periodic_volume_type_sync(self, context, vol_type_ids=None):
@ -373,16 +378,19 @@ class PowerVCCinderManager(service.Service):
self.tg.add_timer(sync_interval, sync)
def _handle_powervc_volume_type_create(self, context, message):
"""
Handle instance create messages sent from PowerVC.
def _handle_powervc_volume_type_create(self,
context=None,
ctxt=None,
event_type=None,
payload=None):
"""Handle instance create messages sent from PowerVC.
:param: context The security context
:param: message The AMQP message sent from OpenStack (dictionary)
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
LOG.debug("Handling notification: %s" % message.get('event_type'))
payload = message.get('payload')
vol_type = payload.get('volume_types')
if(vol_type is None):
LOG.warning("Null volume type in volume.create notification")
@ -430,16 +438,19 @@ class PowerVCCinderManager(service.Service):
if volume_backend_name == storage_hostname:
self._insert_pvc_volume_type(context, vol_type)
def _handle_powervc_volume_type_delete(self, context, message):
"""
Handle instance delete messages sent from PowerVC.
def _handle_powervc_volume_type_delete(self,
context=None,
ctxt=None,
event_type=None,
payload=None):
"""Handle instance delete messages sent from PowerVC.
:param: context The security context
:param: message The AMQP message sent from OpenStack (dictionary)
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
LOG.debug("Handling notification: %s" % message.get('event_type'))
payload = message.get('payload')
vol_type = payload.get('volume_types')
if(vol_type is None):
LOG.warning("Null volume type, ignore volume.create notification")
@ -460,18 +471,21 @@ class PowerVCCinderManager(service.Service):
# Remove the instance from the local OS
self._unregister_volume_types(context, pvc_vol_type_id)
def _handle_powervc_volume_type_extra_spec_update(self, context, message):
"""
Handle instance state changes sent from PowerVC. This includes
def _handle_powervc_volume_type_extra_spec_update(self,
context=None,
ctxt=None,
event_type=None,
payload=None):
"""Handle instance state changes sent from PowerVC. This includes
instance update and all other state changes caused by events like
power on, power off, resize, live migration, and snapshot.
:param: context The security context
:param: message The AMQP message sent from OpenStack (dictionary)
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
event_type = message.get('event_type')
LOG.debug("Handling notification: %s" % event_type)
payload = message.get('payload')
pvc_vol_type_id = payload.get('type_id')
if(pvc_vol_type_id is None):
LOG.debug('Null volume type id, ignore extra specs update')
@ -720,16 +734,19 @@ class PowerVCCinderManager(service.Service):
break
return found
def _handle_powervc_volume_create(self, context, message):
"""
Handle volume create messages sent from PowerVC.
def _handle_powervc_volume_create(self,
context=None,
ctxt=None,
event_type=None,
payload=None):
"""Handle volume create messages sent from PowerVC.
:param: context The security context
:param: message The AMQP message sent from OpenStack (dictionary)
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
LOG.debug("Handling notification: %s" % message.get('event_type'))
payload = message.get('payload')
pvc_volume_id = payload.get('volume_id')
# If the volume already exists locally then ignore
@ -751,16 +768,19 @@ class PowerVCCinderManager(service.Service):
LOG.debug('Volume not accessible, ignored!')
return
def _handle_powervc_volume_delete(self, context, message):
"""
Handle volume create messages sent from PowerVC.
def _handle_powervc_volume_delete(self,
context=None,
ctxt=None,
event_type=None,
payload=None):
"""Handle volume create messages sent from PowerVC.
:param: context The security context
:param: message The AMQP message sent from OpenStack (dictionary)
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
LOG.debug("Handling notification: %s" % message.get('event_type'))
payload = message.get('payload')
pvc_volume_id = payload.get('volume_id')
# If the volume does not already exist locally then ignore
@ -771,16 +791,19 @@ class PowerVCCinderManager(service.Service):
self._unregister_volumes(context, local_volume.get('id'))
def _handle_powervc_volume_update(self, context, message):
"""
Handle volume create messages sent from PowerVC.
def _handle_powervc_volume_update(self,
context=None,
ctxt=None,
event_type=None,
payload=None):
"""Handle volume create messages sent from PowerVC.
:param: context The security context
:param: message The AMQP message sent from OpenStack (dictionary)
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
LOG.debug("Handling notification: %s" % message.get('event_type'))
payload = message.get('payload')
pvc_volume_id = payload.get('volume_id')
local_volume = self._get_local_volume_by_pvc_id(context, pvc_volume_id)

View File

@ -29,7 +29,7 @@ fake_volume = {'display_name': 'fake_volume',
'instance_uuid': '',
'attach_status': ''}
fake_message = {'payload': {'volume_id': '', 'display_name': ''}}
fake_payload = {'volume_id': '', 'display_name': ''}
fake_context = {}
@ -81,7 +81,8 @@ class Test(unittest.TestCase):
self.moxer.ReplayAll()
self.manager._handle_powervc_volume_create(fake_context, fake_message)
self.manager._handle_powervc_volume_create(context=fake_context,
payload=fake_payload)
self.moxer.UnsetStubs()
self.moxer.VerifyAll()
@ -107,7 +108,8 @@ class Test(unittest.TestCase):
self.moxer.ReplayAll()
self.manager._handle_powervc_volume_create(fake_context, fake_message)
self.manager._handle_powervc_volume_create(context=fake_context,
payload=fake_payload)
self.moxer.UnsetStubs()
self.moxer.VerifyAll()

View File

@ -1,301 +1,132 @@
# Copyright 2013 IBM Corp.
# Copyright 2014 IBM Corp.
"""
This module contains Qpid connection utilities that can be used to connect
to a Qpid message broker and listen for notifications.
Examples:
# Import common messaging module
from powervc.common import messaging
# Connect to host OS Qpid broker and handle instance update notifications.
conn = messaging.LocalConnection(
reconnect_handler=self.handle_qpid_reconnect)
listener = conn.create_listener('nova', 'notifications.info')
listener.register_handler('compute.instance.update',
self._handle_instance_update)
conn.start()
# Connect to PowerVC Qpid broker and handle two event types with a single
# handler function.
conn = messaging.PowerVCConnection()
listener = conn.create_listener('nova', 'notifications.info')
listener.register_handler(['compute.instance.create.start',
'compute.instance.create.end'],
self._handle_instance_create)
conn.start()
# Connect to PowerVC Qpid broker and handle any instance notifications.
conn = messaging.PowerVCConnection()
listener = conn.create_listener('nova', 'notifications.info')
listener.register_handler('compute.instance.*',
self._handle_instance_notifications)
conn.start()
"""This module contains common structures and functions that help to handle
AMQP messages based on olso.messaging framework.
"""
import fnmatch
import json
import sys
import threading
import traceback
import time
from time import sleep
from qpid.messaging import Connection
from qpid.messaging.exceptions import ConnectionError
from powervc.common import config
from powervc.common.gettextutils import _
from oslo.messaging.notify import dispatcher
def log(log, level, msg):
class NotificationEndpoint(object):
"""Message listener endpoint, used to register handler functions, receive
and dispatch notification messages.
"""
Log a message.
MSG_LEVEL = {0: 'AUDIT', 1: 'DEBUG', 2: 'INFO', 3: 'WARN',
4: 'ERROR', 5: 'CRITICAL', 6: 'SAMPLE'}
:param: log The log to write to.
:param: level The logging level for the message
:param: msg The message to log
"""
if not log:
return
if level == 'critical':
log.critical(msg)
elif level == 'error':
log.error(msg)
elif level == 'warn':
log.warn(msg)
elif level == 'info':
log.info(msg)
elif level == 'debug':
log.debug(msg)
def __init__(self, log=None, sec_context=None):
"""Create a NotificationEndpoint object, the core part of a listener.
class QpidConnection(object):
"""
This class represents a connection to a Qpid broker. A QpidConnection must
be created in order to send or receive AMQP messages using a Qpid broker.
"""
def __init__(self, url, username, password, transport='tcp',
reconnection_interval=60, reconnect_handler=None,
context=None, log=None):
:param: log logger used when handle messages.
:param: sec_context this is a security context contains keystone auth.
token for API access, not the context sent by message notifier.
"""
Create a new connection to a Qpid message broker in order to send or
receive AMQP messages.
:param: url URL for the Qpid connection, e.g. 9.10.49.164:5672
:param: username Qpid username
:param: password Qpid password
:param: transport Transport mechanism, one of tcp, tcp+tls,
or ssl (alias for tcp+tls).
:param: reconnection_interval Interval in seconds between reconnect
attempts.
:param: reconnect_handler The function to call upon reconnecting to
the Qpid broker after connection was lost and
then reestablished. This function will be called after the
connections is reestablished but before the listeners are
started up again. It is not passed any parameters.
:param: context The security context
:param: log The logging module used for logging messages. If not
provided then no logging will be done.
"""
self.url = url
self.username = username
self.password = password
self.context = context
self.log = log.getLogger(__name__) if log else None
self.transport = transport
self.reconnection_interval = reconnection_interval
self.reconnect_handler = reconnect_handler
self._listeners = []
self._is_connected = False
def create_listener(self, exchange, topic):
"""
Create a new listener on the given exchange for the given topic.
:param: exchange The name of the Qpid exchange, e.g. 'nova'
:param: topic The topic to listen for, e.g. 'notifications.info'
:returns: A new QpidListener that will listen for messages on the
given exchange and topic.
"""
listener = QpidListener(self, exchange, topic)
self._listeners.append(listener)
return listener
def start(self, is_reconnect=False):
"""
Initiate the Qpid connection and start up any listeners.
:param: is_reconnect True if this method is called as part of a
reconnect attempt, False otherwise
:raise: ConnectionError if a connection cannot be established
"""
# If the Qpid broker URL is not specified (or just the hostname is not
# specified) then we can't make a connection.
if not self.url or self.url.startswith(':'):
log(self.log, 'warn', _('Qpid broker not specified, cannot start '
'connection.'))
return
if not self._is_connected:
self.conn = Connection(self.url, username=self.username,
password=self.password,
transport=self.transport)
try:
self.conn.open()
except ConnectionError as e:
log(self.log, 'critical', _('Cannot connect to Qpid message '
'broker: %s') % (e.message))
# close this connection when encounter connection error
# otherwise, it will leave an ESTABLISHED connection
# to qpid server forever.
if self.conn is not None:
self.conn.close()
raise e
self._is_connected = True
if is_reconnect and self.reconnect_handler:
self.reconnect_handler()
for listener in self._listeners:
listener._start(self.conn)
log(self.log, 'info', _('Connected to Qpid message broker: '
'%s@%s') % (self.username, self.url))
def _reconnect(self):
"""
Attempt to reconnect to the Qpid message broker in intervals until the
connection comes back.
"""
self.conn = None
class ReconnectionThread(threading.Thread):
def __init__(self, qpid_connection):
super(ReconnectionThread, self).__init__(
name='ReconnectionThread')
self.qpid_connection = qpid_connection
def run(self):
while not self.qpid_connection._is_connected:
try:
self.qpid_connection.start(is_reconnect=True)
except ConnectionError:
sleep(self.qpid_connection.reconnection_interval)
pass
reconnection_thread = ReconnectionThread(self)
reconnection_thread.start()
def set_reconnect_handler(self, reconnect_handler):
"""
Set the function to call upon reconnecting to the Qpid broker after
connection is lost and then reestablished.
:param: reconnect_handler The function to call upon reconnecting.
"""
self.reconnect_handler = reconnect_handler
class PowerVCConnection(QpidConnection):
"""
This class represents a connection to the PowerVC Qpid broker as defined
in the configuration property files.
"""
def __init__(self, reconnect_handler=None, context=None, log=None):
"""
Create a new connection to the PowerVC Qpid message broker in order
to send or receive AMQP messages.
:param: reconnect_handler The function to call upon reconnecting to
the Qpid broker after connection was lost and
then reestablished. This function will be called after the
connection is reestablished but before the listeners are
started up again. It is not passed any parameters.
:param: context The security context
:param: log The logging module used for logging messages. If not
provided then no logging will be done.
"""
if config.AMQP_POWERVC_CONF.qpid_protocol == 'ssl':
transport = 'ssl'
else:
transport = 'tcp'
super(PowerVCConnection,
self).__init__('%s:%d' % (
config.AMQP_POWERVC_CONF.qpid_hostname,
config.AMQP_POWERVC_CONF.qpid_port),
config.AMQP_POWERVC_CONF.qpid_username,
config.AMQP_POWERVC_CONF.qpid_password,
reconnect_handler=reconnect_handler,
context=context, log=log, transport=transport)
class LocalConnection(QpidConnection):
"""
This class represents a connection to the local OS Qpid broker as defined
in the configuration property files.
"""
def __init__(self, reconnect_handler=None, context=None, log=None):
"""
Create a new connection to the local OS Qpid message broker in order
to send or receive AMQP messages.
:param: reconnect_handler The function to call upon reconnecting to
the Qpid broker after connection was lost and
then reestablished. This function will be called after the
connection is reestablished but before the listeners are
started up again. It is not passed any parameters.
:param: context The security context
:param: log The logging module used for logging messages. If not
provided then no logging will be done.
"""
if config.AMQP_OPENSTACK_CONF.qpid_protocol == 'ssl':
transport = 'ssl'
else:
transport = 'tcp'
super(LocalConnection,
self).__init__('%s:%d' % (
config.AMQP_OPENSTACK_CONF.qpid_hostname,
config.AMQP_OPENSTACK_CONF.qpid_port),
config.AMQP_OPENSTACK_CONF.qpid_username,
config.AMQP_OPENSTACK_CONF.qpid_password,
reconnect_handler=reconnect_handler,
context=context, log=log, transport=transport)
class QpidListener(object):
'''
This class is used to listen for AMQP message notifications. It should
probably not be instantiated directly. First create a QpidConnection and
then add a QpidListener to the connection using the
QpidConnection.create_listener() method.
'''
def __init__(self, qpid_connection, exchange, topic):
"""
Create a new QpidListener object to listen for AMQP messages.
:param: qpid_connection The QpidConnection object used for connecting
to the Qpid message broker.
:param: exchange The name of the Qpid exchange, e.g. 'nova'
:param: topic The topic to listen for, e.g. 'notifications.info'
"""
self.qpid_connection = qpid_connection
self.exchange = exchange
self.topic = topic
self._handler_map = {}
self._count_since_acknowledge = 0
self._log = log
self._sec_ctxt = sec_context
def audit(self, ctxt, publisher_id, event_type, payload, metadata):
"""Receive a notification at audit level."""
return self._dispatch(0, ctxt, publisher_id,
event_type, payload, metadata)
def debug(self, ctxt, publisher_id, event_type, payload, metadata):
"""Receive a notification at debug level."""
return self._dispatch(1, ctxt, publisher_id,
event_type, payload, metadata)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""Receive a notification at info level."""
return self._dispatch(2, ctxt, publisher_id,
event_type, payload, metadata)
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
"""Receive a notification at warning level."""
return self._dispatch(3, ctxt, publisher_id,
event_type, payload, metadata)
def error(self, ctxt, publisher_id, event_type, payload, metadata):
"""Receive a notification at error level."""
return self._dispatch(4, ctxt, publisher_id,
event_type, payload, metadata)
def critical(self, ctxt, publisher_id, event_type, payload, metadata):
"""Receive a notification at critical level."""
return self._dispatch(5, ctxt, publisher_id,
event_type, payload, metadata)
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
"""Receive a notification at sample level.
Sample notifications are for high-frequency events
that typically contain small payloads. eg: "CPU = 70%"
Not all drivers support the sample level
(log, for example) so these could be dropped.
"""
return self._dispatch(6, ctxt, publisher_id,
event_type, payload, metadata)
def _dispatch(self, level, ctxt, publisher_id, event_type, payload,
metadata):
"""Route message to handlers according event_type registered.
"""
handlers = self._get_handlers(event_type)
try:
if handlers:
if self._log and self._log.isEnabledFor('INFO'):
self._log.info("'%s' level '%s' type message is received. "
"Routing to handlers..."
% (self.MSG_LEVEL[level], event_type)
)
for handler in handlers:
start_time = time.time()
handler(context=self._sec_ctxt,
ctxt=ctxt,
event_type=event_type,
payload=payload,
)
end_time = time.time()
if self._log and self._log.isEnabledFor('DEBUG'):
self._log.debug("handler '%s' uses '%f' time(s)"
% (handler, end_time - start_time)
)
return dispatcher.NotificationResult.HANDLED
except Exception:
self._log.exception("Error handling '%(level)s' level '%(type)s' "
"type message '%(msg)s'."
% {'level': self.MSG_LEVEL[level],
'type': event_type,
'msg': payload,
}
)
# TODO(gpanda): consider if requeue is needed in the future,
# not all transport drivers implement support for requeueing, if
# the driver does not support requeueing, it will raise
# NotImplementedError. As far as I tested(oslo.messaging 1.3.1 +
# qpidd 0.14), it doesn't support.
# return dispatcher.NotificationResult.REQUEUE
finally:
pass
def _get_handlers(self, event_type):
"""Get a list of all the registered handlers that match the given event
type.
"""
handlers = []
for event_type_pattern in self._handler_map:
if fnmatch.fnmatch(event_type, event_type_pattern):
handlers.append(self._handler_map.get(event_type_pattern))
return handlers
def register_handler(self, event_type, handler):
"""
Register a handler function for one or more message notification event
types. The handler function will be called when a message is
"""Register a handler function for one or more message notification
event types. The handler function will be called when a message is
received that matches the event type. The handler function will be
passed two arguments: the security context and a dictionary containing
the message attributes. The message attributes include: event_type,
@ -333,173 +164,15 @@ class QpidListener(object):
for et in event_type:
self._handler_map[et] = handler
def unregister_handler(self, event_type):
"""
Stop handling the given message notification event type.
:param: event_type The event type to unregister
"""
try:
self._handler_map.pop(event_type)
except KeyError:
log(self.qpid_connection.log, 'warn',
_('There is no handler for this event type: %s') % event_type)
def start_notification_listener(notification_listener):
def _run():
notification_listener.start()
notification_listener.wait()
def _start(self, connection):
"""
Start listening for messages. This method should probably not be called
directly. After creating a QpidConnection and adding listeners using
the create_listener() method, use the QpidConnection.start() method to
start listening for messages. The QpidConnection will start up all of
the listeners.
:param: connection The qpid.messaging.endpoints.Connection object used
to establish the connection to the message broker.
"""
self.session = connection.session('%s/%s' %
(self.exchange, self.topic))
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": True,
"auto-delete": True
},
},
}
connection_info = "%s / %s ; %s" % (self.exchange, self.topic,
json.dumps(addr_opts))
self.receiver = self.session.receiver(connection_info)
log(self.qpid_connection.log, 'debug',
_('QpidListener session info: %s') % (json.dumps(connection_info)))
"""
A listener blocks while it waits for the next message on the queue,
so we initiate a thread to run the listening function.
"""
t = threading.Thread(target=self._listen)
t.start()
def _has_more_messages(self):
'''
Determine if there are any new messages in the queue.
:returns: True if there are messages on the queue, False otherwise
'''
return bool(self.receiver)
def _next_message(self):
'''
Wait for the next message on the queue.
:returns: The raw message object from the message queue
'''
return self.receiver.fetch()
def _acknowledge(self):
'''
Acknowledge a message has been received.
'''
self.session.acknowledge(sync=False)
def _get_handlers(self, event_type):
"""
Get a list of all the registered handlers that match the given event
type.
"""
handlers = []
for event_type_pattern in self._handler_map:
if fnmatch.fnmatch(event_type, event_type_pattern):
handlers.append(self._handler_map.get(event_type_pattern))
return handlers
def _dispatch(self, message):
'''
Dispatch a message to its specific handler.
:param: message A dictionary containing the OpenStack message
notification attributes (event_type, timestamp,
message_id, priority, publisher_id, payload)
'''
event_type = message.get('event_type')
handlers = self._get_handlers(event_type)
log_ = self.qpid_connection.log
self._count_since_acknowledge += 1
try:
if handlers:
log(log_, 'debug', _('Dispatching message to handlers'))
log(log_, 'info', _('Qpid listener received '
'message of event type: %s'
% message['event_type']))
for handler in handlers:
handler(self.qpid_connection.context, message)
except Exception, e:
log(log_, 'error', _('Error handling message: %s: %s. Message: '
'%s.') % (Exception, e, message))
# Print stack trace
exc_type, exc_value, exc_traceback = sys.exc_info()
log(log_, 'error', _('error type %s') % (exc_type))
log(log_, 'error', _('error object %s') % (exc_value))
log(log_, 'error', ''.join(traceback.format_tb(exc_traceback)))
finally:
if self._count_since_acknowledge > 100:
self._count_since_acknowledge = 0
self._acknowledge()
def _resolve_message(self, raw_message):
'''
Resolves the given raw message obtained from the Qpid message queue
into a message that can be dispatched to a handler function.
:param: raw_message A raw message obtained from the Qpid message
queue
:returns: A dictionary containing the following keys:
event_type, timestamp, message_id, priority, publisher_id, payload
'''
content_type = raw_message.content_type
if content_type == 'application/json; charset=utf8':
content = json.loads(raw_message.content)
elif content_type == 'amqp/map':
content = raw_message.content
else:
log(self.qpid_connection.log,
'warn',
_('Qpid listener received unsupported message: '
'%s\nwith content_type %s') % (raw_message.content,
content_type))
return None
message = dict()
for attr in ['event_type', 'timestamp', 'message_id', 'priority',
'publisher_id', 'payload']:
message[attr] = content.get(attr)
log(self.qpid_connection.log, 'debug', _('Qpid listener received '
'message: %s') % (message))
return message
def _listen(self):
'''
Handle messages when they arrive on the message queue.
'''
while True:
try:
if self._has_more_messages():
raw_message = self._next_message()
message = self._resolve_message(raw_message)
if message is None:
continue
self._dispatch(message)
else:
break
except ConnectionError, e:
log(self.qpid_connection.log, 'warn',
_("Connection error: %s") % (e))
self.qpid_connection._is_connected = False
self.qpid_connection._reconnect()
break
except Exception, e:
log(self.qpid_connection.log, 'warn',
_("Unknown error happens for event listener: %s") % (e))
"""
A listener blocks while it waits for the next message on the queue,
so we initiate a thread to run the listening function.
"""
t = threading.Thread(target=_run)
t.start()

View File

@ -1,56 +0,0 @@
# Copyright 2013 IBM Corp.
import unittest
from powervc.common.messaging import QpidConnection
class QpidTest(unittest.TestCase):
def setUp(self):
super(QpidTest, self).setUp()
self.conn = QpidConnection(url='127.0.0.1:5989',
username='test_username',
password='test_passwd',
transport='tcp',
reconnection_interval=60,
reconnect_handler=None,
context=None,
log=None)
def test_create_listener(self):
self.listener = self.conn.\
create_listener('test_exchange', 'test_topic')
self.assertNotEqual(self.listener, None)
self.assertEqual([self.listener], self.conn._listeners)
def test_register_handler(self):
def _fake_handler():
pass
if not hasattr(self, 'listener'):
self.listener = self.conn.\
create_listener('test_exchange', 'test_topic')
self.listener.register_handler('foo.bar.*', _fake_handler)
self.assertEqual(self.listener._handler_map['foo.bar.*'],
_fake_handler)
def test_unregister_handler(self):
def _fake_handler():
pass
if not hasattr(self, 'listener'):
self.listener = self.conn.\
create_listener('test_exchange', 'test_topic')
self.listener.register_handler('foo.bar.*', _fake_handler)
self.assertEqual(self.listener._handler_map['foo.bar.*'],
_fake_handler)
self.listener.unregister_handler('foo.bar.*')
self.assertEqual(self.listener._handler_map,
{})
def tearDown(self):
unittest.TestCase.tearDown(self)
if __name__ == "__main__":
unittest.main()

View File

@ -66,6 +66,9 @@ LOCAL = 'local'
EVENT_TYPE = 'type'
EVENT_CONTEXT = 'context'
EVENT_MESSAGE = 'message'
EVENT_PAYLOAD = 'payload'
REAL_EVENT_TYPE = 'real_type'
REAL_EVENT_CONTEXT = 'ctxt'
# Event queue event types
LOCAL_IMAGE_EVENT = LOCAL
@ -77,7 +80,7 @@ STARTUP_SCAN_EVENT = 'startup'
IMAGE_EVENT_EXCHANGE = 'glance'
# Image notification event topic
IMAGE_EVENT_TOPIC = 'notifications.info'
IMAGE_EVENT_TOPIC = 'notifications'
# Image notification event types
IMAGE_EVENT_TYPE_ALL = 'image.*'

View File

@ -11,7 +11,6 @@ import Queue
import threading
import itertools
from operator import itemgetter
import HTMLParser
from powervc.common import config
@ -23,7 +22,6 @@ from glanceclient.exc import CommunicationError
from glanceclient.exc import HTTPNotFound
from powervc.common import constants as consts
from powervc.common import messaging
from powervc.common.exception import StorageConnectivityGroupNotFound
from powervc.common.gettextutils import _
from powervc.common.client import factory as clients
@ -31,6 +29,12 @@ from powervc.glance.common import constants
from powervc.glance.common import config as glance_config
from powervc.common import utils
from powervc.common import messaging
from oslo.messaging.notify import listener
from oslo.messaging import target
from oslo.messaging import transport
CONF = glance_config.CONF
LOG = logging.getLogger(__name__)
@ -2305,9 +2309,8 @@ class PowerVCImageManager(service.Service):
self._start_pvc_event_handler()
def _start_local_event_handler(self):
"""
Start the local hosting OS image notification event handler if it's not
already running.
"""Start the local hosting OS image notification event handler if it's
not already running.
The event handler is not started if the qpid_hostname is not specified
in the configuration.
@ -2317,45 +2320,35 @@ class PowerVCImageManager(service.Service):
if self.local_event_handler_running:
return
def local_event_reconnect_handler():
"""
The reconnect handler will start a periodic scan operation.
"""
LOG.info(_("Processing local event handler reconnection..."))
self._add_periodic_sync_to_queue()
LOG.debug("Enter _start_local_event_handler method")
try:
trans = transport.get_transport(config.AMQP_OPENSTACK_CONF)
targets = [
target.Target(exchange=constants.IMAGE_EVENT_EXCHANGE,
topic=constants.IMAGE_EVENT_TOPIC)
]
endpoint = messaging.NotificationEndpoint(log=LOG)
# See if the host is specified. If not, do not attempt to connect
# and register the event handler
host = config.AMQP_OPENSTACK_CONF.qpid_hostname
if host and host is not None:
local_conn = messaging.LocalConnection(
reconnect_handler=local_event_reconnect_handler,
log=logging)
local_listener = local_conn.create_listener(
constants.IMAGE_EVENT_EXCHANGE,
constants.IMAGE_EVENT_TOPIC)
endpoint.register_handler(constants.IMAGE_EVENT_TYPE_ALL,
self._local_image_notifications)
# Register the handler to begin processing messages
local_listener.register_handler(
constants.IMAGE_EVENT_TYPE_ALL,
self._local_image_notifications)
local_conn.start()
LOG.info(_('Monitoring local hosting OS for Image '
'notification events...'))
self.local_event_handler_running = True
else:
LOG.warning(_('Local hosting OS image event handling could '
'not be started because the qpid_host was not '
'specified in the configuration file.'))
except Exception as e:
LOG.exception(_('An error occurred starting the local hosting OS '
'image notification event handler: %s'), e)
endpoints = [
endpoint,
]
LOG.debug("Starting to listen...... ")
local_glance_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(local_glance_listener)
LOG.debug("Exit _start_local_event_handler method")
self.local_event_handler_running = True
def _start_pvc_event_handler(self):
"""
Start the PowerVC image notification event handler if not already
"""Start the PowerVC image notification event handler if not already
running.
The event handler is not started if the powervc_qpid_hostname is
@ -2366,41 +2359,32 @@ class PowerVCImageManager(service.Service):
if self.pvc_event_handler_running:
return
def pvc_event_reconnect_handler():
"""
The reconnect handler will start a periodic scan operation.
"""
LOG.info(_("Processing PowerVC event handler reconnection..."))
self._add_periodic_sync_to_queue()
LOG.debug("Enter _start_pvc_event_handler method")
try:
trans = transport.get_transport(config.AMQP_POWERVC_CONF)
targets = [
target.Target(exchange=constants.IMAGE_EVENT_EXCHANGE,
topic=constants.IMAGE_EVENT_TOPIC)
]
endpoint = messaging.NotificationEndpoint(log=LOG)
# See if the host is specified. If not, do not attempt to connect
# and register the event handler
host = config.AMQP_POWERVC_CONF.qpid_hostname
if host and host is not None:
pvc_conn = messaging.PowerVCConnection(
reconnect_handler=pvc_event_reconnect_handler, log=logging)
pvc_listener = pvc_conn.create_listener(
constants.IMAGE_EVENT_EXCHANGE,
constants.IMAGE_EVENT_TOPIC)
endpoint.register_handler(constants.IMAGE_EVENT_TYPE_ALL,
self._pvc_image_notifications)
# Register the handler to begin processing messages
pvc_listener.register_handler(
constants.IMAGE_EVENT_TYPE_ALL,
self._pvc_image_notifications)
pvc_conn.start()
LOG.info(_('Monitoring PowerVC for Image notification '
'events...'))
self.pvc_event_handler_running = True
else:
LOG.warning(_('PowerVC image event handling could not be '
'started because the powervc_qpid_host was not '
'specified in the configuration file.'))
endpoints = [
endpoint,
]
except Exception as e:
LOG.exception(_('An error occurred starting the PowerVC image '
'notification event handler: %s'), e)
LOG.debug("Starting to listen...... ")
pvc_glance_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(pvc_glance_listener)
LOG.debug("Exit _start_pvc_event_handler method")
self.pvc_event_handler_running = True
def _process_event_queue(self):
"""
@ -2421,17 +2405,28 @@ class PowerVCImageManager(service.Service):
str(self.local_events_to_ignore_dict))
LOG.debug(_('pvc events to ignore: %s'),
str(self.pvc_events_to_ignore_dict))
event_type = event.get(constants.EVENT_TYPE)
context = event.get(constants.EVENT_CONTEXT)
message = event.get(constants.EVENT_MESSAGE)
event_type = event.get(constants.EVENT_TYPE)
ctxt = event.get(constants.REAL_EVENT_CONTEXT)
real_type = event.get(constants.REAL_EVENT_TYPE)
payload = event.get(constants.EVENT_PAYLOAD)
if event_type == constants.LOCAL_IMAGE_EVENT:
LOG.debug(_('Processing a local hostingOS image event on '
'the event queue: %s'), str(event))
self._handle_local_image_notifications(context, message)
self.\
_handle_local_image_notifications(context=context,
ctxt=ctxt,
event_type=real_type,
payload=payload,
)
elif event_type == constants.PVC_IMAGE_EVENT:
LOG.debug(_('Processing a PowerVC image event on '
'the event queue: %s'), str(event))
self._handle_pvc_image_notifications(context, message)
self._handle_pvc_image_notifications(context=context,
ctxt=ctxt,
event_type=real_type,
payload=payload,
)
elif event_type == constants.PERIODIC_SCAN_EVENT:
LOG.debug(_('Processing a periodic sync event on '
'the event queue: %s'), str(event))
@ -2449,23 +2444,35 @@ class PowerVCImageManager(service.Service):
finally:
self.event_queue.task_done()
def _local_image_notifications(self, context, message):
"""
Place the local image event on the event queue for processing.
def _local_image_notifications(self,
context=None,
ctxt=None,
event_type=None,
payload=None):
"""Place the local image event on the event queue for processing.
:param: context The event security context
:param: message The event message
:param: context The security context
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
event = {}
event[constants.EVENT_TYPE] = constants.LOCAL_IMAGE_EVENT
event[constants.EVENT_CONTEXT] = context
event[constants.EVENT_MESSAGE] = message
event[constants.REAL_EVENT_CONTEXT] = ctxt
event[constants.REAL_EVENT_TYPE] = event_type
event[constants.EVENT_PAYLOAD] = payload
LOG.debug(_('Adding local image event to event queue: %s'), str(event))
self.event_queue.put(event)
def _handle_local_image_notifications(self, context, message):
"""
Handle image notification events received from the local hosting OS.
def _handle_local_image_notifications(self,
context=None,
ctxt=None,
event_type=None,
payload=None,
):
"""Handle image notification events received from the local hosting OS.
Only handle update, and delete event types. The activate event
is processed, but only to add the new image to the update_at dict.
@ -2474,14 +2481,13 @@ class PowerVCImageManager(service.Service):
event from PowerVC to the ignore list. Then when that event arrives
from PowerVC because of this update we will ignore it.
:param: context The event security context
:param: message The event message
:param: context The security context
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
if message is None:
LOG.debug(_('The local image event notification had no message!'))
return
event_type = message.get('event_type')
v1image_dict = message.get('payload')
v1image_dict = payload
if event_type == constants.IMAGE_EVENT_TYPE_UPDATE:
self._process_local_image_update_event(v1image_dict)
elif event_type == constants.IMAGE_EVENT_TYPE_DELETE:
@ -2491,7 +2497,11 @@ class PowerVCImageManager(service.Service):
elif event_type == constants.IMAGE_EVENT_TYPE_CREATE:
self._process_local_image_create_event(v1image_dict)
else:
LOG.debug(_('Did not process event: %s'), str(message))
LOG.debug(_("Did not process event: type:'%(event_type)s' type, "
"payload:'%(payload)s'"
)
% (event_type, payload)
)
def _process_local_image_update_event(self, v1image_dict):
"""
@ -2815,24 +2825,37 @@ class PowerVCImageManager(service.Service):
'\'%s\'. The PowerVC UUID is not known.'),
local_name)
def _pvc_image_notifications(self, context, message):
"""
Place the PowerVC image event on the event queue for processing.
def _pvc_image_notifications(self,
context=None,
ctxt=None,
event_type=None,
payload=None):
"""Place the PowerVC image event on the event queue for processing.
:param: context The event security context
:param: message The event message
:param: context The security context
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
event = {}
event[constants.EVENT_TYPE] = constants.PVC_IMAGE_EVENT
event[constants.EVENT_CONTEXT] = context
event[constants.EVENT_MESSAGE] = message
event[constants.REAL_EVENT_CONTEXT] = ctxt
event[constants.REAL_EVENT_TYPE] = event_type
event[constants.EVENT_PAYLOAD] = payload
LOG.debug(_('Adding PowerVC image event to event queue: %s'),
str(event))
self.event_queue.put(event)
def _handle_pvc_image_notifications(self, context, message):
"""
Handle image notification events received from PowerVC.
def _handle_pvc_image_notifications(self,
context=None,
ctxt=None,
event_type=None,
payload=None,
):
"""Handle image notification events received from PowerVC.
Only handle activate, update, and delete event types.
There is a scheme in place to keep events from ping-ponging back
@ -2841,15 +2864,13 @@ class PowerVCImageManager(service.Service):
that event arrives from the hosting OS because of this update we
will ignore it.
:param: context The event security context
:param: message The event message
:param: context The security context
:param: ctxt message context
:param: event_type message event type
:param: payload The AMQP message sent from OpenStack (dictionary)
"""
if message is None:
LOG.debug(_('The PowerVC image event notification had no '
'message!'))
return
event_type = message.get('event_type')
v1image_dict = message.get('payload')
v1image_dict = payload
if event_type == constants.IMAGE_EVENT_TYPE_UPDATE:
self._process_pvc_image_update_event(v1image_dict)
elif event_type == constants.IMAGE_EVENT_TYPE_DELETE:
@ -2857,7 +2878,11 @@ class PowerVCImageManager(service.Service):
elif event_type == constants.IMAGE_EVENT_TYPE_ACTIVATE:
self._process_pvc_image_activate_event(v1image_dict)
else:
LOG.debug(_('Did not process event: %s'), str(message))
LOG.debug(_("Did not process event: type:'%(event_type)s' type, "
"payload:'%(payload)s'"
)
% (event_type, payload)
)
def _process_pvc_image_update_event(self, v1image_dict):
"""