diff --git a/cinder-powervc/powervc/volume/manager/manager.py b/cinder-powervc/powervc/volume/manager/manager.py index 0dc457e..1fb09bc 100644 --- a/cinder-powervc/powervc/volume/manager/manager.py +++ b/cinder-powervc/powervc/volume/manager/manager.py @@ -14,11 +14,16 @@ from cinder.openstack.common import log from powervc.common import config from powervc.common.gettextutils import _ from powervc.volume.manager import constants -from powervc.common import messaging from powervc.volume.driver import service as pvcservice from powervc.common import utils from powervc.common.client import delegate as ctx_delegate +from powervc.common import messaging + +from oslo.messaging.notify import listener +from oslo.messaging import target +from oslo.messaging import transport + CONF = config.CONF LOG = log.getLogger(__name__) @@ -142,82 +147,82 @@ class PowerVCCinderManager(service.Service): self.volume_service.start() def _create_powervc_listeners(self, ctx): - """ - Listen for out-of-band changes made in PowerVC. + """Listen for out-of-band changes made in PowerVC. - This method creates the connection to the PowerVC Qpid broker and + This method creates the listner to the PowerVC AMQP broker and sets up handlers so that any changes made directly in PowerVC are reflected in the local OS. :param: ctx The security context """ LOG.debug("Enter _create_powervc_listeners method") - # Function to call if we lose the Qpid connection and then get it back - def reconnect_handler(): - LOG.debug('Re-established connection to Qpid broker, sync all ' - 'volume types on next sync interval') - self.full_volume_type_sync_required = True - - # Create Qpid connection and listener - LOG.debug("Building connection with AMQP server") - conn = messaging.PowerVCConnection(reconnect_handler=reconnect_handler, - context=ctx, - log=logging) - LOG.debug("Creating message listener to linsten PowerVC event") - listener = conn.create_listener('cinder', 'notifications.info') + trans = transport.get_transport(config.AMQP_POWERVC_CONF) + targets = [ + target.Target(exchange='cinder', topic='notifications') + ] + endpoint = messaging.NotificationEndpoint(log=LOG, sec_context=ctx) # Volume type creation LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_TYPE_CREATE) - listener.register_handler(constants.EVENT_VOLUME_TYPE_CREATE, + endpoint.register_handler(constants.EVENT_VOLUME_TYPE_CREATE, self._handle_powervc_volume_type_create) # Volume type deletion LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_TYPE_DELETE) - listener.register_handler(constants.EVENT_VOLUME_TYPE_DELETE, + endpoint.register_handler(constants.EVENT_VOLUME_TYPE_DELETE, self._handle_powervc_volume_type_delete) # Volume type extra spec changes LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_TYPE_EXTRA_SPECS_UPDATE) - listener.register_handler([ + endpoint.register_handler([ constants.EVENT_VOLUME_TYPE_EXTRA_SPECS_UPDATE], self._handle_powervc_volume_type_extra_spec_update) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_CREATE_END) - listener.register_handler([constants.EVENT_VOLUME_CREATE_END], + endpoint.register_handler([constants.EVENT_VOLUME_CREATE_END], self._handle_powervc_volume_create) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_IMPORT_END) - listener.register_handler([constants.EVENT_VOLUME_IMPORT_END], + endpoint.register_handler([constants.EVENT_VOLUME_IMPORT_END], self._handle_powervc_volume_create) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_DELETE_END) - listener.register_handler([constants.EVENT_VOLUME_DELETE_END], + endpoint.register_handler([constants.EVENT_VOLUME_DELETE_END], self._handle_powervc_volume_delete) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_UPDATE) - listener.register_handler([constants.EVENT_VOLUME_UPDATE], + endpoint.register_handler([constants.EVENT_VOLUME_UPDATE], self._handle_powervc_volume_update) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_ATTACH_END) - listener.register_handler([constants.EVENT_VOLUME_ATTACH_END], + endpoint.register_handler([constants.EVENT_VOLUME_ATTACH_END], self._handle_powervc_volume_update) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_DETACH_END) - listener.register_handler([constants.EVENT_VOLUME_DETACH_END], + endpoint.register_handler([constants.EVENT_VOLUME_DETACH_END], self._handle_powervc_volume_update) + endpoints = [ + endpoint, + ] + LOG.debug("Starting to listen...... ") - conn.start() + + pvc_cinder_listener = listener.\ + get_notification_listener(trans, targets, endpoints, + allow_requeue=False) + messaging.start_notification_listener(pvc_cinder_listener) + LOG.debug("Exit _create_powervc_listeners method") def _periodic_volume_type_sync(self, context, vol_type_ids=None): @@ -373,16 +378,19 @@ class PowerVCCinderManager(service.Service): self.tg.add_timer(sync_interval, sync) - def _handle_powervc_volume_type_create(self, context, message): - """ - Handle instance create messages sent from PowerVC. + def _handle_powervc_volume_type_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle instance create messages sent from PowerVC. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - LOG.debug("Handling notification: %s" % message.get('event_type')) - payload = message.get('payload') vol_type = payload.get('volume_types') if(vol_type is None): LOG.warning("Null volume type in volume.create notification") @@ -430,16 +438,19 @@ class PowerVCCinderManager(service.Service): if volume_backend_name == storage_hostname: self._insert_pvc_volume_type(context, vol_type) - def _handle_powervc_volume_type_delete(self, context, message): - """ - Handle instance delete messages sent from PowerVC. + def _handle_powervc_volume_type_delete(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle instance delete messages sent from PowerVC. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - LOG.debug("Handling notification: %s" % message.get('event_type')) - payload = message.get('payload') vol_type = payload.get('volume_types') if(vol_type is None): LOG.warning("Null volume type, ignore volume.create notification") @@ -460,18 +471,21 @@ class PowerVCCinderManager(service.Service): # Remove the instance from the local OS self._unregister_volume_types(context, pvc_vol_type_id) - def _handle_powervc_volume_type_extra_spec_update(self, context, message): - """ - Handle instance state changes sent from PowerVC. This includes + def _handle_powervc_volume_type_extra_spec_update(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle instance state changes sent from PowerVC. This includes instance update and all other state changes caused by events like power on, power off, resize, live migration, and snapshot. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - event_type = message.get('event_type') - LOG.debug("Handling notification: %s" % event_type) - payload = message.get('payload') + pvc_vol_type_id = payload.get('type_id') if(pvc_vol_type_id is None): LOG.debug('Null volume type id, ignore extra specs update') @@ -720,16 +734,19 @@ class PowerVCCinderManager(service.Service): break return found - def _handle_powervc_volume_create(self, context, message): - """ - Handle volume create messages sent from PowerVC. + def _handle_powervc_volume_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle volume create messages sent from PowerVC. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - LOG.debug("Handling notification: %s" % message.get('event_type')) - payload = message.get('payload') pvc_volume_id = payload.get('volume_id') # If the volume already exists locally then ignore @@ -751,16 +768,19 @@ class PowerVCCinderManager(service.Service): LOG.debug('Volume not accessible, ignored!') return - def _handle_powervc_volume_delete(self, context, message): - """ - Handle volume create messages sent from PowerVC. + def _handle_powervc_volume_delete(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle volume create messages sent from PowerVC. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - LOG.debug("Handling notification: %s" % message.get('event_type')) - payload = message.get('payload') pvc_volume_id = payload.get('volume_id') # If the volume does not already exist locally then ignore @@ -771,16 +791,19 @@ class PowerVCCinderManager(service.Service): self._unregister_volumes(context, local_volume.get('id')) - def _handle_powervc_volume_update(self, context, message): - """ - Handle volume create messages sent from PowerVC. + def _handle_powervc_volume_update(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle volume create messages sent from PowerVC. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - LOG.debug("Handling notification: %s" % message.get('event_type')) - payload = message.get('payload') pvc_volume_id = payload.get('volume_id') local_volume = self._get_local_volume_by_pvc_id(context, pvc_volume_id) diff --git a/cinder-powervc/test/powervc/volume/manager/test_manager.py b/cinder-powervc/test/powervc/volume/manager/test_manager.py index c368756..1c7d7a9 100644 --- a/cinder-powervc/test/powervc/volume/manager/test_manager.py +++ b/cinder-powervc/test/powervc/volume/manager/test_manager.py @@ -29,7 +29,7 @@ fake_volume = {'display_name': 'fake_volume', 'instance_uuid': '', 'attach_status': ''} -fake_message = {'payload': {'volume_id': '', 'display_name': ''}} +fake_payload = {'volume_id': '', 'display_name': ''} fake_context = {} @@ -81,7 +81,8 @@ class Test(unittest.TestCase): self.moxer.ReplayAll() - self.manager._handle_powervc_volume_create(fake_context, fake_message) + self.manager._handle_powervc_volume_create(context=fake_context, + payload=fake_payload) self.moxer.UnsetStubs() self.moxer.VerifyAll() @@ -107,7 +108,8 @@ class Test(unittest.TestCase): self.moxer.ReplayAll() - self.manager._handle_powervc_volume_create(fake_context, fake_message) + self.manager._handle_powervc_volume_create(context=fake_context, + payload=fake_payload) self.moxer.UnsetStubs() self.moxer.VerifyAll() diff --git a/common-powervc/powervc/common/messaging.py b/common-powervc/powervc/common/messaging.py index 5046b48..23f9eba 100644 --- a/common-powervc/powervc/common/messaging.py +++ b/common-powervc/powervc/common/messaging.py @@ -1,301 +1,132 @@ -# Copyright 2013 IBM Corp. +# Copyright 2014 IBM Corp. -""" -This module contains Qpid connection utilities that can be used to connect -to a Qpid message broker and listen for notifications. - -Examples: - - # Import common messaging module - from powervc.common import messaging - - # Connect to host OS Qpid broker and handle instance update notifications. - conn = messaging.LocalConnection( - reconnect_handler=self.handle_qpid_reconnect) - listener = conn.create_listener('nova', 'notifications.info') - listener.register_handler('compute.instance.update', - self._handle_instance_update) - conn.start() - - # Connect to PowerVC Qpid broker and handle two event types with a single - # handler function. - conn = messaging.PowerVCConnection() - listener = conn.create_listener('nova', 'notifications.info') - listener.register_handler(['compute.instance.create.start', - 'compute.instance.create.end'], - self._handle_instance_create) - conn.start() - - # Connect to PowerVC Qpid broker and handle any instance notifications. - conn = messaging.PowerVCConnection() - listener = conn.create_listener('nova', 'notifications.info') - listener.register_handler('compute.instance.*', - self._handle_instance_notifications) - conn.start() +"""This module contains common structures and functions that help to handle +AMQP messages based on olso.messaging framework. """ import fnmatch -import json -import sys import threading -import traceback +import time -from time import sleep - -from qpid.messaging import Connection -from qpid.messaging.exceptions import ConnectionError - -from powervc.common import config - -from powervc.common.gettextutils import _ +from oslo.messaging.notify import dispatcher -def log(log, level, msg): +class NotificationEndpoint(object): + """Message listener endpoint, used to register handler functions, receive + and dispatch notification messages. """ - Log a message. + MSG_LEVEL = {0: 'AUDIT', 1: 'DEBUG', 2: 'INFO', 3: 'WARN', + 4: 'ERROR', 5: 'CRITICAL', 6: 'SAMPLE'} - :param: log The log to write to. - :param: level The logging level for the message - :param: msg The message to log - """ - if not log: - return - if level == 'critical': - log.critical(msg) - elif level == 'error': - log.error(msg) - elif level == 'warn': - log.warn(msg) - elif level == 'info': - log.info(msg) - elif level == 'debug': - log.debug(msg) + def __init__(self, log=None, sec_context=None): + """Create a NotificationEndpoint object, the core part of a listener. - -class QpidConnection(object): - """ - This class represents a connection to a Qpid broker. A QpidConnection must - be created in order to send or receive AMQP messages using a Qpid broker. - """ - - def __init__(self, url, username, password, transport='tcp', - reconnection_interval=60, reconnect_handler=None, - context=None, log=None): + :param: log logger used when handle messages. + :param: sec_context this is a security context contains keystone auth. + token for API access, not the context sent by message notifier. """ - Create a new connection to a Qpid message broker in order to send or - receive AMQP messages. - - :param: url URL for the Qpid connection, e.g. 9.10.49.164:5672 - :param: username Qpid username - :param: password Qpid password - :param: transport Transport mechanism, one of tcp, tcp+tls, - or ssl (alias for tcp+tls). - :param: reconnection_interval Interval in seconds between reconnect - attempts. - :param: reconnect_handler The function to call upon reconnecting to - the Qpid broker after connection was lost and - then reestablished. This function will be called after the - connections is reestablished but before the listeners are - started up again. It is not passed any parameters. - :param: context The security context - :param: log The logging module used for logging messages. If not - provided then no logging will be done. - """ - self.url = url - self.username = username - self.password = password - self.context = context - self.log = log.getLogger(__name__) if log else None - self.transport = transport - self.reconnection_interval = reconnection_interval - self.reconnect_handler = reconnect_handler - self._listeners = [] - self._is_connected = False - - def create_listener(self, exchange, topic): - """ - Create a new listener on the given exchange for the given topic. - - :param: exchange The name of the Qpid exchange, e.g. 'nova' - :param: topic The topic to listen for, e.g. 'notifications.info' - :returns: A new QpidListener that will listen for messages on the - given exchange and topic. - """ - listener = QpidListener(self, exchange, topic) - self._listeners.append(listener) - return listener - - def start(self, is_reconnect=False): - """ - Initiate the Qpid connection and start up any listeners. - - :param: is_reconnect True if this method is called as part of a - reconnect attempt, False otherwise - :raise: ConnectionError if a connection cannot be established - """ - # If the Qpid broker URL is not specified (or just the hostname is not - # specified) then we can't make a connection. - if not self.url or self.url.startswith(':'): - log(self.log, 'warn', _('Qpid broker not specified, cannot start ' - 'connection.')) - return - - if not self._is_connected: - self.conn = Connection(self.url, username=self.username, - password=self.password, - transport=self.transport) - try: - self.conn.open() - except ConnectionError as e: - log(self.log, 'critical', _('Cannot connect to Qpid message ' - 'broker: %s') % (e.message)) - # close this connection when encounter connection error - # otherwise, it will leave an ESTABLISHED connection - # to qpid server forever. - if self.conn is not None: - self.conn.close() - raise e - - self._is_connected = True - - if is_reconnect and self.reconnect_handler: - self.reconnect_handler() - - for listener in self._listeners: - listener._start(self.conn) - - log(self.log, 'info', _('Connected to Qpid message broker: ' - '%s@%s') % (self.username, self.url)) - - def _reconnect(self): - """ - Attempt to reconnect to the Qpid message broker in intervals until the - connection comes back. - """ - self.conn = None - - class ReconnectionThread(threading.Thread): - def __init__(self, qpid_connection): - super(ReconnectionThread, self).__init__( - name='ReconnectionThread') - self.qpid_connection = qpid_connection - - def run(self): - while not self.qpid_connection._is_connected: - try: - self.qpid_connection.start(is_reconnect=True) - except ConnectionError: - sleep(self.qpid_connection.reconnection_interval) - pass - - reconnection_thread = ReconnectionThread(self) - reconnection_thread.start() - - def set_reconnect_handler(self, reconnect_handler): - """ - Set the function to call upon reconnecting to the Qpid broker after - connection is lost and then reestablished. - - :param: reconnect_handler The function to call upon reconnecting. - """ - self.reconnect_handler = reconnect_handler - - -class PowerVCConnection(QpidConnection): - """ - This class represents a connection to the PowerVC Qpid broker as defined - in the configuration property files. - """ - - def __init__(self, reconnect_handler=None, context=None, log=None): - """ - Create a new connection to the PowerVC Qpid message broker in order - to send or receive AMQP messages. - - :param: reconnect_handler The function to call upon reconnecting to - the Qpid broker after connection was lost and - then reestablished. This function will be called after the - connection is reestablished but before the listeners are - started up again. It is not passed any parameters. - :param: context The security context - :param: log The logging module used for logging messages. If not - provided then no logging will be done. - """ - if config.AMQP_POWERVC_CONF.qpid_protocol == 'ssl': - transport = 'ssl' - else: - transport = 'tcp' - super(PowerVCConnection, - self).__init__('%s:%d' % ( - config.AMQP_POWERVC_CONF.qpid_hostname, - config.AMQP_POWERVC_CONF.qpid_port), - config.AMQP_POWERVC_CONF.qpid_username, - config.AMQP_POWERVC_CONF.qpid_password, - reconnect_handler=reconnect_handler, - context=context, log=log, transport=transport) - - -class LocalConnection(QpidConnection): - """ - This class represents a connection to the local OS Qpid broker as defined - in the configuration property files. - """ - - def __init__(self, reconnect_handler=None, context=None, log=None): - """ - Create a new connection to the local OS Qpid message broker in order - to send or receive AMQP messages. - - :param: reconnect_handler The function to call upon reconnecting to - the Qpid broker after connection was lost and - then reestablished. This function will be called after the - connection is reestablished but before the listeners are - started up again. It is not passed any parameters. - :param: context The security context - :param: log The logging module used for logging messages. If not - provided then no logging will be done. - """ - if config.AMQP_OPENSTACK_CONF.qpid_protocol == 'ssl': - transport = 'ssl' - else: - transport = 'tcp' - super(LocalConnection, - self).__init__('%s:%d' % ( - config.AMQP_OPENSTACK_CONF.qpid_hostname, - config.AMQP_OPENSTACK_CONF.qpid_port), - config.AMQP_OPENSTACK_CONF.qpid_username, - config.AMQP_OPENSTACK_CONF.qpid_password, - reconnect_handler=reconnect_handler, - context=context, log=log, transport=transport) - - -class QpidListener(object): - ''' - This class is used to listen for AMQP message notifications. It should - probably not be instantiated directly. First create a QpidConnection and - then add a QpidListener to the connection using the - QpidConnection.create_listener() method. - ''' - - def __init__(self, qpid_connection, exchange, topic): - """ - Create a new QpidListener object to listen for AMQP messages. - - :param: qpid_connection The QpidConnection object used for connecting - to the Qpid message broker. - :param: exchange The name of the Qpid exchange, e.g. 'nova' - :param: topic The topic to listen for, e.g. 'notifications.info' - """ - self.qpid_connection = qpid_connection - self.exchange = exchange - self.topic = topic self._handler_map = {} - self._count_since_acknowledge = 0 + self._log = log + self._sec_ctxt = sec_context + + def audit(self, ctxt, publisher_id, event_type, payload, metadata): + """Receive a notification at audit level.""" + return self._dispatch(0, ctxt, publisher_id, + event_type, payload, metadata) + + def debug(self, ctxt, publisher_id, event_type, payload, metadata): + """Receive a notification at debug level.""" + return self._dispatch(1, ctxt, publisher_id, + event_type, payload, metadata) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + """Receive a notification at info level.""" + return self._dispatch(2, ctxt, publisher_id, + event_type, payload, metadata) + + def warn(self, ctxt, publisher_id, event_type, payload, metadata): + """Receive a notification at warning level.""" + return self._dispatch(3, ctxt, publisher_id, + event_type, payload, metadata) + + def error(self, ctxt, publisher_id, event_type, payload, metadata): + """Receive a notification at error level.""" + return self._dispatch(4, ctxt, publisher_id, + event_type, payload, metadata) + + def critical(self, ctxt, publisher_id, event_type, payload, metadata): + """Receive a notification at critical level.""" + return self._dispatch(5, ctxt, publisher_id, + event_type, payload, metadata) + + def sample(self, ctxt, publisher_id, event_type, payload, metadata): + """Receive a notification at sample level. + + Sample notifications are for high-frequency events + that typically contain small payloads. eg: "CPU = 70%" + + Not all drivers support the sample level + (log, for example) so these could be dropped. + """ + return self._dispatch(6, ctxt, publisher_id, + event_type, payload, metadata) + + def _dispatch(self, level, ctxt, publisher_id, event_type, payload, + metadata): + """Route message to handlers according event_type registered. + """ + + handlers = self._get_handlers(event_type) + try: + if handlers: + if self._log and self._log.isEnabledFor('INFO'): + self._log.info("'%s' level '%s' type message is received. " + "Routing to handlers..." + % (self.MSG_LEVEL[level], event_type) + ) + for handler in handlers: + start_time = time.time() + handler(context=self._sec_ctxt, + ctxt=ctxt, + event_type=event_type, + payload=payload, + ) + end_time = time.time() + if self._log and self._log.isEnabledFor('DEBUG'): + self._log.debug("handler '%s' uses '%f' time(s)" + % (handler, end_time - start_time) + ) + return dispatcher.NotificationResult.HANDLED + except Exception: + self._log.exception("Error handling '%(level)s' level '%(type)s' " + "type message '%(msg)s'." + % {'level': self.MSG_LEVEL[level], + 'type': event_type, + 'msg': payload, + } + ) + # TODO(gpanda): consider if requeue is needed in the future, + # not all transport drivers implement support for requeueing, if + # the driver does not support requeueing, it will raise + # NotImplementedError. As far as I tested(oslo.messaging 1.3.1 + + # qpidd 0.14), it doesn't support. + # return dispatcher.NotificationResult.REQUEUE + finally: + pass + + def _get_handlers(self, event_type): + """Get a list of all the registered handlers that match the given event + type. + """ + handlers = [] + for event_type_pattern in self._handler_map: + if fnmatch.fnmatch(event_type, event_type_pattern): + handlers.append(self._handler_map.get(event_type_pattern)) + return handlers def register_handler(self, event_type, handler): - """ - Register a handler function for one or more message notification event - types. The handler function will be called when a message is + """Register a handler function for one or more message notification + event types. The handler function will be called when a message is received that matches the event type. The handler function will be passed two arguments: the security context and a dictionary containing the message attributes. The message attributes include: event_type, @@ -333,173 +164,15 @@ class QpidListener(object): for et in event_type: self._handler_map[et] = handler - def unregister_handler(self, event_type): - """ - Stop handling the given message notification event type. - :param: event_type The event type to unregister - """ - try: - self._handler_map.pop(event_type) - except KeyError: - log(self.qpid_connection.log, 'warn', - _('There is no handler for this event type: %s') % event_type) +def start_notification_listener(notification_listener): + def _run(): + notification_listener.start() + notification_listener.wait() - def _start(self, connection): - """ - Start listening for messages. This method should probably not be called - directly. After creating a QpidConnection and adding listeners using - the create_listener() method, use the QpidConnection.start() method to - start listening for messages. The QpidConnection will start up all of - the listeners. - - :param: connection The qpid.messaging.endpoints.Connection object used - to establish the connection to the message broker. - """ - self.session = connection.session('%s/%s' % - (self.exchange, self.topic)) - addr_opts = { - "create": "always", - "node": { - "type": "topic", - "x-declare": { - "durable": True, - "auto-delete": True - }, - }, - } - - connection_info = "%s / %s ; %s" % (self.exchange, self.topic, - json.dumps(addr_opts)) - self.receiver = self.session.receiver(connection_info) - log(self.qpid_connection.log, 'debug', - _('QpidListener session info: %s') % (json.dumps(connection_info))) - - """ - A listener blocks while it waits for the next message on the queue, - so we initiate a thread to run the listening function. - """ - t = threading.Thread(target=self._listen) - t.start() - - def _has_more_messages(self): - ''' - Determine if there are any new messages in the queue. - - :returns: True if there are messages on the queue, False otherwise - ''' - return bool(self.receiver) - - def _next_message(self): - ''' - Wait for the next message on the queue. - - :returns: The raw message object from the message queue - ''' - return self.receiver.fetch() - - def _acknowledge(self): - ''' - Acknowledge a message has been received. - ''' - self.session.acknowledge(sync=False) - - def _get_handlers(self, event_type): - """ - Get a list of all the registered handlers that match the given event - type. - """ - handlers = [] - for event_type_pattern in self._handler_map: - if fnmatch.fnmatch(event_type, event_type_pattern): - handlers.append(self._handler_map.get(event_type_pattern)) - return handlers - - def _dispatch(self, message): - ''' - Dispatch a message to its specific handler. - - :param: message A dictionary containing the OpenStack message - notification attributes (event_type, timestamp, - message_id, priority, publisher_id, payload) - ''' - event_type = message.get('event_type') - handlers = self._get_handlers(event_type) - log_ = self.qpid_connection.log - self._count_since_acknowledge += 1 - - try: - if handlers: - log(log_, 'debug', _('Dispatching message to handlers')) - log(log_, 'info', _('Qpid listener received ' - 'message of event type: %s' - % message['event_type'])) - for handler in handlers: - handler(self.qpid_connection.context, message) - except Exception, e: - log(log_, 'error', _('Error handling message: %s: %s. Message: ' - '%s.') % (Exception, e, message)) - - # Print stack trace - exc_type, exc_value, exc_traceback = sys.exc_info() - log(log_, 'error', _('error type %s') % (exc_type)) - log(log_, 'error', _('error object %s') % (exc_value)) - log(log_, 'error', ''.join(traceback.format_tb(exc_traceback))) - finally: - if self._count_since_acknowledge > 100: - self._count_since_acknowledge = 0 - self._acknowledge() - - def _resolve_message(self, raw_message): - ''' - Resolves the given raw message obtained from the Qpid message queue - into a message that can be dispatched to a handler function. - - :param: raw_message A raw message obtained from the Qpid message - queue - :returns: A dictionary containing the following keys: - event_type, timestamp, message_id, priority, publisher_id, payload - ''' - content_type = raw_message.content_type - if content_type == 'application/json; charset=utf8': - content = json.loads(raw_message.content) - elif content_type == 'amqp/map': - content = raw_message.content - else: - log(self.qpid_connection.log, - 'warn', - _('Qpid listener received unsupported message: ' - '%s\nwith content_type %s') % (raw_message.content, - content_type)) - return None - message = dict() - for attr in ['event_type', 'timestamp', 'message_id', 'priority', - 'publisher_id', 'payload']: - message[attr] = content.get(attr) - log(self.qpid_connection.log, 'debug', _('Qpid listener received ' - 'message: %s') % (message)) - return message - - def _listen(self): - ''' - Handle messages when they arrive on the message queue. - ''' - while True: - try: - if self._has_more_messages(): - raw_message = self._next_message() - message = self._resolve_message(raw_message) - if message is None: - continue - self._dispatch(message) - else: - break - except ConnectionError, e: - log(self.qpid_connection.log, 'warn', - _("Connection error: %s") % (e)) - self.qpid_connection._is_connected = False - self.qpid_connection._reconnect() - break - except Exception, e: - log(self.qpid_connection.log, 'warn', - _("Unknown error happens for event listener: %s") % (e)) + """ + A listener blocks while it waits for the next message on the queue, + so we initiate a thread to run the listening function. + """ + t = threading.Thread(target=_run) + t.start() diff --git a/common-powervc/test/common/test_messaging.py b/common-powervc/test/common/test_messaging.py deleted file mode 100644 index d9cda73..0000000 --- a/common-powervc/test/common/test_messaging.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2013 IBM Corp. -import unittest -from powervc.common.messaging import QpidConnection - - -class QpidTest(unittest.TestCase): - - def setUp(self): - super(QpidTest, self).setUp() - self.conn = QpidConnection(url='127.0.0.1:5989', - username='test_username', - password='test_passwd', - transport='tcp', - reconnection_interval=60, - reconnect_handler=None, - context=None, - log=None) - - def test_create_listener(self): - self.listener = self.conn.\ - create_listener('test_exchange', 'test_topic') - self.assertNotEqual(self.listener, None) - self.assertEqual([self.listener], self.conn._listeners) - - def test_register_handler(self): - def _fake_handler(): - pass - - if not hasattr(self, 'listener'): - self.listener = self.conn.\ - create_listener('test_exchange', 'test_topic') - - self.listener.register_handler('foo.bar.*', _fake_handler) - self.assertEqual(self.listener._handler_map['foo.bar.*'], - _fake_handler) - - def test_unregister_handler(self): - def _fake_handler(): - pass - - if not hasattr(self, 'listener'): - self.listener = self.conn.\ - create_listener('test_exchange', 'test_topic') - - self.listener.register_handler('foo.bar.*', _fake_handler) - self.assertEqual(self.listener._handler_map['foo.bar.*'], - _fake_handler) - self.listener.unregister_handler('foo.bar.*') - self.assertEqual(self.listener._handler_map, - {}) - - def tearDown(self): - unittest.TestCase.tearDown(self) - -if __name__ == "__main__": - unittest.main() diff --git a/glance-powervc/powervc/glance/common/constants.py b/glance-powervc/powervc/glance/common/constants.py index aaa2b58..a7ee63f 100644 --- a/glance-powervc/powervc/glance/common/constants.py +++ b/glance-powervc/powervc/glance/common/constants.py @@ -66,6 +66,9 @@ LOCAL = 'local' EVENT_TYPE = 'type' EVENT_CONTEXT = 'context' EVENT_MESSAGE = 'message' +EVENT_PAYLOAD = 'payload' +REAL_EVENT_TYPE = 'real_type' +REAL_EVENT_CONTEXT = 'ctxt' # Event queue event types LOCAL_IMAGE_EVENT = LOCAL @@ -77,7 +80,7 @@ STARTUP_SCAN_EVENT = 'startup' IMAGE_EVENT_EXCHANGE = 'glance' # Image notification event topic -IMAGE_EVENT_TOPIC = 'notifications.info' +IMAGE_EVENT_TOPIC = 'notifications' # Image notification event types IMAGE_EVENT_TYPE_ALL = 'image.*' diff --git a/glance-powervc/powervc/glance/manager/manager.py b/glance-powervc/powervc/glance/manager/manager.py index d1c393f..a364c60 100644 --- a/glance-powervc/powervc/glance/manager/manager.py +++ b/glance-powervc/powervc/glance/manager/manager.py @@ -11,7 +11,6 @@ import Queue import threading import itertools from operator import itemgetter -import HTMLParser from powervc.common import config @@ -23,7 +22,6 @@ from glanceclient.exc import CommunicationError from glanceclient.exc import HTTPNotFound from powervc.common import constants as consts -from powervc.common import messaging from powervc.common.exception import StorageConnectivityGroupNotFound from powervc.common.gettextutils import _ from powervc.common.client import factory as clients @@ -31,6 +29,12 @@ from powervc.glance.common import constants from powervc.glance.common import config as glance_config from powervc.common import utils +from powervc.common import messaging + +from oslo.messaging.notify import listener +from oslo.messaging import target +from oslo.messaging import transport + CONF = glance_config.CONF LOG = logging.getLogger(__name__) @@ -2305,9 +2309,8 @@ class PowerVCImageManager(service.Service): self._start_pvc_event_handler() def _start_local_event_handler(self): - """ - Start the local hosting OS image notification event handler if it's not - already running. + """Start the local hosting OS image notification event handler if it's + not already running. The event handler is not started if the qpid_hostname is not specified in the configuration. @@ -2317,45 +2320,35 @@ class PowerVCImageManager(service.Service): if self.local_event_handler_running: return - def local_event_reconnect_handler(): - """ - The reconnect handler will start a periodic scan operation. - """ - LOG.info(_("Processing local event handler reconnection...")) - self._add_periodic_sync_to_queue() + LOG.debug("Enter _start_local_event_handler method") - try: + trans = transport.get_transport(config.AMQP_OPENSTACK_CONF) + targets = [ + target.Target(exchange=constants.IMAGE_EVENT_EXCHANGE, + topic=constants.IMAGE_EVENT_TOPIC) + ] + endpoint = messaging.NotificationEndpoint(log=LOG) - # See if the host is specified. If not, do not attempt to connect - # and register the event handler - host = config.AMQP_OPENSTACK_CONF.qpid_hostname - if host and host is not None: - local_conn = messaging.LocalConnection( - reconnect_handler=local_event_reconnect_handler, - log=logging) - local_listener = local_conn.create_listener( - constants.IMAGE_EVENT_EXCHANGE, - constants.IMAGE_EVENT_TOPIC) + endpoint.register_handler(constants.IMAGE_EVENT_TYPE_ALL, + self._local_image_notifications) - # Register the handler to begin processing messages - local_listener.register_handler( - constants.IMAGE_EVENT_TYPE_ALL, - self._local_image_notifications) - local_conn.start() - LOG.info(_('Monitoring local hosting OS for Image ' - 'notification events...')) - self.local_event_handler_running = True - else: - LOG.warning(_('Local hosting OS image event handling could ' - 'not be started because the qpid_host was not ' - 'specified in the configuration file.')) - except Exception as e: - LOG.exception(_('An error occurred starting the local hosting OS ' - 'image notification event handler: %s'), e) + endpoints = [ + endpoint, + ] + + LOG.debug("Starting to listen...... ") + + local_glance_listener = listener.\ + get_notification_listener(trans, targets, endpoints, + allow_requeue=False) + messaging.start_notification_listener(local_glance_listener) + + LOG.debug("Exit _start_local_event_handler method") + + self.local_event_handler_running = True def _start_pvc_event_handler(self): - """ - Start the PowerVC image notification event handler if not already + """Start the PowerVC image notification event handler if not already running. The event handler is not started if the powervc_qpid_hostname is @@ -2366,41 +2359,32 @@ class PowerVCImageManager(service.Service): if self.pvc_event_handler_running: return - def pvc_event_reconnect_handler(): - """ - The reconnect handler will start a periodic scan operation. - """ - LOG.info(_("Processing PowerVC event handler reconnection...")) - self._add_periodic_sync_to_queue() + LOG.debug("Enter _start_pvc_event_handler method") - try: + trans = transport.get_transport(config.AMQP_POWERVC_CONF) + targets = [ + target.Target(exchange=constants.IMAGE_EVENT_EXCHANGE, + topic=constants.IMAGE_EVENT_TOPIC) + ] + endpoint = messaging.NotificationEndpoint(log=LOG) - # See if the host is specified. If not, do not attempt to connect - # and register the event handler - host = config.AMQP_POWERVC_CONF.qpid_hostname - if host and host is not None: - pvc_conn = messaging.PowerVCConnection( - reconnect_handler=pvc_event_reconnect_handler, log=logging) - pvc_listener = pvc_conn.create_listener( - constants.IMAGE_EVENT_EXCHANGE, - constants.IMAGE_EVENT_TOPIC) + endpoint.register_handler(constants.IMAGE_EVENT_TYPE_ALL, + self._pvc_image_notifications) - # Register the handler to begin processing messages - pvc_listener.register_handler( - constants.IMAGE_EVENT_TYPE_ALL, - self._pvc_image_notifications) - pvc_conn.start() - LOG.info(_('Monitoring PowerVC for Image notification ' - 'events...')) - self.pvc_event_handler_running = True - else: - LOG.warning(_('PowerVC image event handling could not be ' - 'started because the powervc_qpid_host was not ' - 'specified in the configuration file.')) + endpoints = [ + endpoint, + ] - except Exception as e: - LOG.exception(_('An error occurred starting the PowerVC image ' - 'notification event handler: %s'), e) + LOG.debug("Starting to listen...... ") + + pvc_glance_listener = listener.\ + get_notification_listener(trans, targets, endpoints, + allow_requeue=False) + messaging.start_notification_listener(pvc_glance_listener) + + LOG.debug("Exit _start_pvc_event_handler method") + + self.pvc_event_handler_running = True def _process_event_queue(self): """ @@ -2421,17 +2405,28 @@ class PowerVCImageManager(service.Service): str(self.local_events_to_ignore_dict)) LOG.debug(_('pvc events to ignore: %s'), str(self.pvc_events_to_ignore_dict)) - event_type = event.get(constants.EVENT_TYPE) context = event.get(constants.EVENT_CONTEXT) - message = event.get(constants.EVENT_MESSAGE) + event_type = event.get(constants.EVENT_TYPE) + ctxt = event.get(constants.REAL_EVENT_CONTEXT) + real_type = event.get(constants.REAL_EVENT_TYPE) + payload = event.get(constants.EVENT_PAYLOAD) if event_type == constants.LOCAL_IMAGE_EVENT: LOG.debug(_('Processing a local hostingOS image event on ' 'the event queue: %s'), str(event)) - self._handle_local_image_notifications(context, message) + self.\ + _handle_local_image_notifications(context=context, + ctxt=ctxt, + event_type=real_type, + payload=payload, + ) elif event_type == constants.PVC_IMAGE_EVENT: LOG.debug(_('Processing a PowerVC image event on ' 'the event queue: %s'), str(event)) - self._handle_pvc_image_notifications(context, message) + self._handle_pvc_image_notifications(context=context, + ctxt=ctxt, + event_type=real_type, + payload=payload, + ) elif event_type == constants.PERIODIC_SCAN_EVENT: LOG.debug(_('Processing a periodic sync event on ' 'the event queue: %s'), str(event)) @@ -2449,23 +2444,35 @@ class PowerVCImageManager(service.Service): finally: self.event_queue.task_done() - def _local_image_notifications(self, context, message): - """ - Place the local image event on the event queue for processing. + def _local_image_notifications(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Place the local image event on the event queue for processing. - :param: context The event security context - :param: message The event message + :param: context The security context + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ event = {} event[constants.EVENT_TYPE] = constants.LOCAL_IMAGE_EVENT event[constants.EVENT_CONTEXT] = context - event[constants.EVENT_MESSAGE] = message + event[constants.REAL_EVENT_CONTEXT] = ctxt + event[constants.REAL_EVENT_TYPE] = event_type + event[constants.EVENT_PAYLOAD] = payload + LOG.debug(_('Adding local image event to event queue: %s'), str(event)) self.event_queue.put(event) - def _handle_local_image_notifications(self, context, message): - """ - Handle image notification events received from the local hosting OS. + def _handle_local_image_notifications(self, + context=None, + ctxt=None, + event_type=None, + payload=None, + ): + """Handle image notification events received from the local hosting OS. Only handle update, and delete event types. The activate event is processed, but only to add the new image to the update_at dict. @@ -2474,14 +2481,13 @@ class PowerVCImageManager(service.Service): event from PowerVC to the ignore list. Then when that event arrives from PowerVC because of this update we will ignore it. - :param: context The event security context - :param: message The event message + :param: context The security context + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - if message is None: - LOG.debug(_('The local image event notification had no message!')) - return - event_type = message.get('event_type') - v1image_dict = message.get('payload') + + v1image_dict = payload if event_type == constants.IMAGE_EVENT_TYPE_UPDATE: self._process_local_image_update_event(v1image_dict) elif event_type == constants.IMAGE_EVENT_TYPE_DELETE: @@ -2491,7 +2497,11 @@ class PowerVCImageManager(service.Service): elif event_type == constants.IMAGE_EVENT_TYPE_CREATE: self._process_local_image_create_event(v1image_dict) else: - LOG.debug(_('Did not process event: %s'), str(message)) + LOG.debug(_("Did not process event: type:'%(event_type)s' type, " + "payload:'%(payload)s'" + ) + % (event_type, payload) + ) def _process_local_image_update_event(self, v1image_dict): """ @@ -2815,24 +2825,37 @@ class PowerVCImageManager(service.Service): '\'%s\'. The PowerVC UUID is not known.'), local_name) - def _pvc_image_notifications(self, context, message): - """ - Place the PowerVC image event on the event queue for processing. + def _pvc_image_notifications(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Place the PowerVC image event on the event queue for processing. - :param: context The event security context - :param: message The event message + :param: context The security context + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ + event = {} event[constants.EVENT_TYPE] = constants.PVC_IMAGE_EVENT event[constants.EVENT_CONTEXT] = context - event[constants.EVENT_MESSAGE] = message + event[constants.REAL_EVENT_CONTEXT] = ctxt + event[constants.REAL_EVENT_TYPE] = event_type + event[constants.EVENT_PAYLOAD] = payload + LOG.debug(_('Adding PowerVC image event to event queue: %s'), str(event)) self.event_queue.put(event) - def _handle_pvc_image_notifications(self, context, message): - """ - Handle image notification events received from PowerVC. + def _handle_pvc_image_notifications(self, + context=None, + ctxt=None, + event_type=None, + payload=None, + ): + """Handle image notification events received from PowerVC. Only handle activate, update, and delete event types. There is a scheme in place to keep events from ping-ponging back @@ -2841,15 +2864,13 @@ class PowerVCImageManager(service.Service): that event arrives from the hosting OS because of this update we will ignore it. - :param: context The event security context - :param: message The event message + :param: context The security context + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - if message is None: - LOG.debug(_('The PowerVC image event notification had no ' - 'message!')) - return - event_type = message.get('event_type') - v1image_dict = message.get('payload') + + v1image_dict = payload if event_type == constants.IMAGE_EVENT_TYPE_UPDATE: self._process_pvc_image_update_event(v1image_dict) elif event_type == constants.IMAGE_EVENT_TYPE_DELETE: @@ -2857,7 +2878,11 @@ class PowerVCImageManager(service.Service): elif event_type == constants.IMAGE_EVENT_TYPE_ACTIVATE: self._process_pvc_image_activate_event(v1image_dict) else: - LOG.debug(_('Did not process event: %s'), str(message)) + LOG.debug(_("Did not process event: type:'%(event_type)s' type, " + "payload:'%(payload)s'" + ) + % (event_type, payload) + ) def _process_pvc_image_update_event(self, v1image_dict): """ diff --git a/glance-powervc/test/test_messaging.py.fails b/glance-powervc/test/test_messaging.py.fails deleted file mode 100644 index 774807b..0000000 --- a/glance-powervc/test/test_messaging.py.fails +++ /dev/null @@ -1,246 +0,0 @@ -# Copyright 2013 IBM Corp. - -import sys -import itertools -import time -import traceback - -from oslo.config import cfg -from glance.openstack.common import gettextutils -gettextutils.install('glance') -import glance.openstack.common.log as logging -from glance.common import config as logging_config -from glanceclient.v1 import images as v1images - -from powervc.common import config -from powervc.glance.common import constants - -# PowerVC Driver ImageManager specific configuration -image_opts = [ - - # The image period sync interval in seconds - cfg.IntOpt('image_periodic_sync_interval_in_seconds', - default=constants.IMAGE_PERIODIC_SYNC_INTERVAL_IN_SECONDS) -] - -CONF = config.CONF -CONF.register_opts(image_opts, 'powervc') - -LOG = logging.getLogger(__name__) - -config.parse_power_config(sys.argv, 'glance') - -from powervc.common import messaging -from powervc.common import constants as consts -import powervc.common.client.factory as clients - - -def test_image_events(wait_forever=True): - - def local_reconnect(): - LOG.debug(_('Re-established connection to local hosting OS ' - 'Qpid broker')) - - local_conn = messaging.LocalConnection(log=logging, - reconnect_handler=local_reconnect) -# local_conn = messaging.QpidConnection('localhost:5672', \ -# 'admin', 'ICA1NTQxNzI5ODgK') -# conn = messaging.QpidConnection('localhost:5672', 'admin', 'openstack1') - local_listener = local_conn.create_listener('glance', 'notifications.info') - local_listener.register_handler('image.*', - handle_local_image_notifications) - local_conn.start() - -# pvc_conn = messaging.QpidConnection('9.5.125.55:5672', \ -# 'anonymous', '') - - def pvc_reconnect(): - LOG.debug(_('Re-established connection to PowerVC Qpid broker')) - - pvc_conn = messaging.PowerVCConnection(log=logging, - reconnect_handler=pvc_reconnect) - -# pvc_conn = messaging.QpidConnection('9.5.125.55:5672', \ -# 'root', 'passw0rd') - pvc_listener = pvc_conn.create_listener('glance', 'notifications.info') - pvc_listener.register_handler('image.*', - handle_pvc_image_notifications) - pvc_conn.start() - - print 'Monitoring hosting OS and PowerVC for Image notifications...' - while wait_forever: - time.sleep(5) - - -def test_pvc_image_events(wait_forever=True): - -# pvc_conn = messaging.QpidConnection('9.5.125.55:5672', \ -# 'anonymous', '') - - def pvc_reconnect(): - LOG.debug(_('Re-established connection to PowerVC Qpid broker')) - - pvc_conn = messaging.PowerVCConnection(log=logging, - reconnect_handler=pvc_reconnect) - -# pvc_conn = messaging.QpidConnection('9.5.125.55:5672', \ -# 'root', 'passw0rd') - pvc_listener = pvc_conn.create_listener('glance', 'notifications.info') - pvc_listener.register_handler('image.*', - handle_pvc_image_notifications) - pvc_conn.start() - - print 'Monitoring PowerVC for Image notifications...' - while wait_forever: - time.sleep(5) - - -def handle_local_image_notifications(context, message): - print '=' * 80 - print 'LOCAL:', str(context) - print 'LOCAL:', str(message) - image = message.get('payload') # should be the v1 image as a dict - dump_image(image) - print '=' * 80 - - -def handle_pvc_image_notifications(context, message): - print '=' * 80 - print 'PVC:', str(context) - print 'PVC:', str(message) - image = message.get('payload') # should be the v1 image as a dict - dump_image(image) - print '=' * 80 - - -def dump_image(image_dict): - for v1imagekey in image_dict.keys(): - print v1imagekey, '=', image_dict.get(v1imagekey) - props = image_dict.get('properties') - if props: - for v1imageprop in props.keys(): - print 'property: ', v1imageprop, '=',\ - props.get(v1imageprop) - - -def test_update_local_image(image_id): - params = {} - filters = {} - filters['is_public'] = False - params['filters'] = filters - local_v1client = \ - clients.LOCAL.get_client(str(consts.SERVICE_TYPES.image), 'v1') - v1local_images = local_v1client.images - image = \ - get_v1image_from_id(image_id, itertools.chain( - v1local_images.list(), v1local_images.list(**params))) - if image: - field_dict, patch_dict = get_v1image_update_fields(image) - if 'is_public' in field_dict.keys(): - public = field_dict['is_public'] - field_dict['is_public'] = not public - v1local_images.update(image, **field_dict) - if len(patch_dict) > 0: - local_v2client = \ - clients.LOCAL.get_client(str(consts.SERVICE_TYPES.image), 'v2') - v2local_images = local_v2client.images - v2local_images.update(image.id, **patch_dict) - print 'Image', image.name, 'updated.' - else: - print 'Image', image_id, 'not found!' - - -def get_v1image_update_fields(image): - """ - Get the properties for an image update - - :param: image The image to pull properties from to be used - for an image update operation. - :returns: A tuple containing with the dict containing the - properties to use for an image update operation, - and the dict of the properties that are too - large to be processed by v1 Image APIs. Those - properties should be updated using the - v2 Image PATCH API. - """ - field_dict = {} - patch_dict = {} - props = image.properties - if props and props is not None: - patch_dict = remove_large_properties(props) - image.properties = props - image_dict = image.to_dict() - for imagekey in image_dict.keys(): - if imagekey in v1images.UPDATE_PARAMS and \ - imagekey not in constants.IMAGE_UPDATE_PARAMS_FILTER: - field_value = image_dict.get(imagekey) - if field_value is not None: - if len(str(field_value)) < constants.MAX_HEADER_LEN_V1: - field_dict[imagekey] = field_value - else: - patch_dict[imagekey] = field_value - return field_dict, patch_dict - - -def remove_large_properties(properties): - """ - Remove any properties that are too large to be processed by - the v1 APIs and return them in a dict to the caller. The properties - passed in are also modified. - - :param: properties. The properties dict to remove large properties - from. Large properties are removed from the original - properties dict - :returns: A dict containing properties that are too large to - be processed by v1 Image APIs - """ - too_large_properties = {} - if properties and properties is not None: - for propkey in properties.keys(): - propvalue = properties.get(propkey) - if propvalue and propvalue is not None: - if properties.get(propkey) and (len(str(propvalue)) >= - constants.MAX_HEADER_LEN_V1): - too_large_properties[propkey] = properties.pop(propkey) - return too_large_properties - - -def test_delete_local_image(image_id): - pass - - -def get_v1image_from_id(image_id, v1images): - """ - Get a v1 Image from an image id. - - :param: image_id The image id - :param: v1images The image manager used to obtain images from the - v1 glance client - :returns: The image for the specified id or None if not found. - """ - for image in v1images: - if image and image.id == image_id: - return image - return None - -""" -Main test entry point -""" -if __name__ == '__main__': - try: - # turn off debug logging -# CONF.debug = False - logging_config.setup_logging() - logging.setup('powervc') - - # test getting the staging project id -# test_image_events(wait_forever=True) - test_pvc_image_events(wait_forever=True) -# image_id = '3060d198-c951-4693-9b1d-6314ac0539bf' -# test_update_local_image(image_id) -# test_delete_local_image(image_id) - - print 'Tests done!' - except Exception: - traceback.print_exc() - raise diff --git a/neutron-powervc/etc/powervc-neutron.conf b/neutron-powervc/etc/powervc-neutron.conf index 3275250..86ca958 100644 --- a/neutron-powervc/etc/powervc-neutron.conf +++ b/neutron-powervc/etc/powervc-neutron.conf @@ -2,45 +2,6 @@ debug = False verbose = True -# The messaging module to use, defaults to kombu. -# rpc_backend = neutron.openstack.common.rpc.impl_kombu -# AMQP password -# rabbit_password = openstack1 -# AMQP host -# rabbit_host = localhost -# Size of RPC thread pool -# rpc_thread_pool_size = 64 -# Size of RPC connection pool -# rpc_conn_pool_size = 30 -# Seconds to wait for a response from call or multicall -# rpc_response_timeout = 60 -# Seconds to wait before a cast expires (TTL). Only supported by impl_zmq. -# rpc_cast_timeout = 30 -# Modules of exceptions that are permitted to be recreated -# upon receiving exception data from an rpc call. -# allowed_rpc_exception_modules = neutron.openstack.common.exception, nova.exception -# AMQP exchange to connect to if using RabbitMQ or QPID -#control_exchange = nova - -# QPID -# rpc_backend=neutron.openstack.common.rpc.impl_qpid -# Qpid broker hostname -# qpid_hostname = localhost -# Qpid broker port -# qpid_port = 5672 -# Username for qpid connection -# qpid_username = qpidclient -# Password for qpid connection -# qpid_password = openstack1 -# Space separated list of SASL mechanisms to use for auth -# qpid_sasl_mechanisms = '' -# Seconds between connection keepalive heartbeats -# qpid_heartbeat = 60 -# Transport to use, either 'tcp' or 'ssl' -# qpid_protocol = tcp -# Disable Nagle algorithm -# qpid_tcp_nodelay = True - [AGENT] # Agent's polling interval in seconds polling_interval = 60 diff --git a/neutron-powervc/powervc/neutron/client/local_os_bindings.py b/neutron-powervc/powervc/neutron/client/local_os_bindings.py index 2685966..2091252 100644 --- a/neutron-powervc/powervc/neutron/client/local_os_bindings.py +++ b/neutron-powervc/powervc/neutron/client/local_os_bindings.py @@ -12,7 +12,6 @@ Created on Aug 1, 2013 from neutron.openstack.common import log as logging -from powervc.common import messaging from powervc.common.client import factory from powervc.common.constants import SERVICE_TYPES from powervc.common.constants import LOCAL_OS @@ -22,6 +21,13 @@ from powervc.neutron.common import constants from powervc.neutron.common import utils from powervc.neutron.db import powervc_db_v2 +from powervc.common import config as cfg +from powervc.common import messaging + +from oslo.messaging.notify import listener +from oslo.messaging import target +from oslo.messaging import transport + LOG = logging.getLogger(__name__) @@ -44,38 +50,54 @@ class Client(neutron_client_bindings.Client): def _create_amqp_listeners(self): """Listen for AMQP messages from the local OS""" - LOG.debug(_('Creating AMQP listeners')) - def reconnect(): - LOG.info(_('Re-established connection to local OS Qpid broker')) - self.agent.queue_event(self.os, constants.EVENT_FULL_SYNC, None) + LOG.debug("Enter _create_amqp_listeners(local) method") - connection = messaging.LocalConnection(log=logging, - reconnect_handler=reconnect) - listener = connection.create_listener(constants.QPID_EXCHANGE, - constants.QPID_TOPIC) - listener.register_handler(constants.EVENT_NETWORK_CREATE, + trans = transport.get_transport(cfg.AMQP_OPENSTACK_CONF) + targets = [ + target.Target(exchange=constants.QPID_EXCHANGE, + topic=constants.QPID_TOPIC) + ] + endpoint = messaging.NotificationEndpoint(log=LOG) + + endpoint.register_handler(constants.EVENT_NETWORK_CREATE, self._handle_network_create) - listener.register_handler(constants.EVENT_NETWORK_UPDATE, + endpoint.register_handler(constants.EVENT_NETWORK_UPDATE, self._handle_network_update) - listener.register_handler(constants.EVENT_NETWORK_DELETE, + endpoint.register_handler(constants.EVENT_NETWORK_DELETE, self._handle_network_delete) - listener.register_handler(constants.EVENT_SUBNET_CREATE, + endpoint.register_handler(constants.EVENT_SUBNET_CREATE, self._handle_subnet_create) - listener.register_handler(constants.EVENT_SUBNET_UPDATE, + endpoint.register_handler(constants.EVENT_SUBNET_UPDATE, self._handle_subnet_update) - listener.register_handler(constants.EVENT_SUBNET_DELETE, + endpoint.register_handler(constants.EVENT_SUBNET_DELETE, self._handle_subnet_delete) - listener.register_handler(constants.EVENT_PORT_CREATE, + endpoint.register_handler(constants.EVENT_PORT_CREATE, self._handle_port_create) - listener.register_handler(constants.EVENT_PORT_UPDATE, + endpoint.register_handler(constants.EVENT_PORT_UPDATE, self._handle_port_update) - listener.register_handler(constants.EVENT_PORT_DELETE, + endpoint.register_handler(constants.EVENT_PORT_DELETE, self._handle_port_delete) - connection.start() - def _handle_network_create(self, context, message): - event, payload = self._extact_event_payload(message) + endpoints = [ + endpoint, + ] + + LOG.debug("Starting to listen...... ") + + local_neutron_listener = listener.\ + get_notification_listener(trans, targets, endpoints, + allow_requeue=False) + messaging.start_notification_listener(local_neutron_listener) + + LOG.debug("Exit _create_amqp_listeners(local) method") + + def _handle_network_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + network = payload.get('network') network_id = network.get('id') if not utils.is_network_mappable(network): @@ -85,20 +107,32 @@ class Client(neutron_client_bindings.Client): if db_net: LOG.info(_("DB entry for network %s already exists"), network_id) return - self.agent.queue_event(self.os, event, network) + self.agent.queue_event(self.os, event_type, network) + + def _handle_network_update(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_network_update(self, context, message): - event, payload = self._extact_event_payload(message) network = payload.get('network') - self.agent.queue_event(self.os, event, network) + self.agent.queue_event(self.os, event_type, network) + + def _handle_network_delete(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_network_delete(self, context, message): - event, payload = self._extact_event_payload(message) network_id = payload.get('network_id') - self.agent.queue_event(self.os, event, network_id) + self.agent.queue_event(self.os, event_type, network_id) + + def _handle_subnet_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_subnet_create(self, context, message): - event, payload = self._extact_event_payload(message) subnet = payload.get('subnet') subnet_id = subnet.get('id') if not utils.is_subnet_mappable(subnet): @@ -108,20 +142,32 @@ class Client(neutron_client_bindings.Client): if db_sub: LOG.info(_("DB entry for subnet %s already exists"), subnet_id) return - self.agent.queue_event(self.os, event, subnet) + self.agent.queue_event(self.os, event_type, subnet) + + def _handle_subnet_update(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_subnet_update(self, context, message): - event, payload = self._extact_event_payload(message) subnet = payload.get('subnet') - self.agent.queue_event(self.os, event, subnet) + self.agent.queue_event(self.os, event_type, subnet) + + def _handle_subnet_delete(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_subnet_delete(self, context, message): - event, payload = self._extact_event_payload(message) subnet_id = payload.get('subnet_id') - self.agent.queue_event(self.os, event, subnet_id) + self.agent.queue_event(self.os, event_type, subnet_id) + + def _handle_port_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_port_create(self, context, message): - event, payload = self._extact_event_payload(message) port = payload.get('port') port_id = port.get('id') if not utils.is_port_mappable(port): @@ -131,17 +177,25 @@ class Client(neutron_client_bindings.Client): if db_port: LOG.info(_("DB entry for port %s already exists"), port_id) return - self.agent.queue_event(self.os, event, port) + self.agent.queue_event(self.os, event_type, port) + + def _handle_port_update(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_port_update(self, context, message): - event, payload = self._extact_event_payload(message) port = payload.get('port') - self.agent.queue_event(self.os, event, port) + self.agent.queue_event(self.os, event_type, port) + + def _handle_port_delete(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_port_delete(self, context, message): - event, payload = self._extact_event_payload(message) port_id = payload.get('port_id') - self.agent.queue_event(self.os, event, port_id) + self.agent.queue_event(self.os, event_type, port_id) def get_power_vm_mapping(self): """ diff --git a/neutron-powervc/powervc/neutron/client/powervc_bindings.py b/neutron-powervc/powervc/neutron/client/powervc_bindings.py index d089973..859ce77 100644 --- a/neutron-powervc/powervc/neutron/client/powervc_bindings.py +++ b/neutron-powervc/powervc/neutron/client/powervc_bindings.py @@ -12,7 +12,6 @@ Created on Aug 1, 2013 from neutron.openstack.common import log as logging -from powervc.common import messaging from powervc.common.constants import POWERVC_OS from powervc.common.gettextutils import _ from powervc.neutron.client import neutron_client_bindings @@ -20,6 +19,13 @@ from powervc.neutron.common import constants from powervc.neutron.common import utils from powervc.neutron.db import powervc_db_v2 +from powervc.common import config as cfg +from powervc.common import messaging + +from oslo.messaging.notify import listener +from oslo.messaging import target +from oslo.messaging import transport + LOG = logging.getLogger(__name__) @@ -36,39 +42,55 @@ class Client(neutron_client_bindings.Client): self._create_amqp_listeners() def _create_amqp_listeners(self): - """Listen for AMQP messages from PowerVC""" - LOG.debug(_('Creating AMQP listeners')) + """Listen for AMQP messages from PowerVC.""" - def reconnect(): - LOG.info(_('Re-established connection to PowerVC Qpid broker')) - self.agent.queue_event(self.os, constants.EVENT_FULL_SYNC, None) + LOG.debug("Entry _create_amqp_listeners(pvc) method") - connection = messaging.PowerVCConnection(log=logging, - reconnect_handler=reconnect) - listener = connection.create_listener(constants.QPID_EXCHANGE, - constants.QPID_TOPIC) - listener.register_handler(constants.EVENT_NETWORK_CREATE, + trans = transport.get_transport(cfg.AMQP_POWERVC_CONF) + targets = [ + target.Target(exchange=constants.QPID_EXCHANGE, + topic=constants.QPID_TOPIC) + ] + endpoint = messaging.NotificationEndpoint(log=LOG) + + endpoint.register_handler(constants.EVENT_NETWORK_CREATE, self._handle_network_create) - listener.register_handler(constants.EVENT_NETWORK_UPDATE, + endpoint.register_handler(constants.EVENT_NETWORK_UPDATE, self._handle_network_update) - listener.register_handler(constants.EVENT_NETWORK_DELETE, + endpoint.register_handler(constants.EVENT_NETWORK_DELETE, self._handle_network_delete) - listener.register_handler(constants.EVENT_SUBNET_CREATE, + endpoint.register_handler(constants.EVENT_SUBNET_CREATE, self._handle_subnet_create) - listener.register_handler(constants.EVENT_SUBNET_UPDATE, + endpoint.register_handler(constants.EVENT_SUBNET_UPDATE, self._handle_subnet_update) - listener.register_handler(constants.EVENT_SUBNET_DELETE, + endpoint.register_handler(constants.EVENT_SUBNET_DELETE, self._handle_subnet_delete) - listener.register_handler(constants.EVENT_PORT_CREATE, + endpoint.register_handler(constants.EVENT_PORT_CREATE, self._handle_port_create) - listener.register_handler(constants.EVENT_PORT_UPDATE, + endpoint.register_handler(constants.EVENT_PORT_UPDATE, self._handle_port_update) - listener.register_handler(constants.EVENT_PORT_DELETE, + endpoint.register_handler(constants.EVENT_PORT_DELETE, self._handle_port_delete) - connection.start() - def _handle_network_create(self, context, message): - event, payload = self._extact_event_payload(message) + endpoints = [ + endpoint, + ] + + LOG.debug("Starting to listen...... ") + + pvc_neutron_listener = listener.\ + get_notification_listener(trans, targets, endpoints, + allow_requeue=False) + messaging.start_notification_listener(pvc_neutron_listener) + + LOG.debug("Exit _create_amqp_listeners(pvc) method") + + def _handle_network_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + network = payload.get('network') network_id = network.get('id') if not utils.is_network_mappable(network): @@ -78,20 +100,32 @@ class Client(neutron_client_bindings.Client): if db_net: LOG.info(_("DB entry for network %s already exists"), network_id) return - self.agent.queue_event(self.os, event, network) + self.agent.queue_event(self.os, event_type, network) + + def _handle_network_update(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_network_update(self, context, message): - event, payload = self._extact_event_payload(message) network = payload.get('network') - self.agent.queue_event(self.os, event, network) + self.agent.queue_event(self.os, event_type, network) + + def _handle_network_delete(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_network_delete(self, context, message): - event, payload = self._extact_event_payload(message) network_id = payload.get('network_id') - self.agent.queue_event(self.os, event, network_id) + self.agent.queue_event(self.os, event_type, network_id) + + def _handle_subnet_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_subnet_create(self, context, message): - event, payload = self._extact_event_payload(message) subnet = payload.get('subnet') subnet_id = subnet.get('id') if not utils.is_subnet_mappable(subnet): @@ -101,20 +135,32 @@ class Client(neutron_client_bindings.Client): if db_sub: LOG.info(_("DB entry for subnet %s already exists"), subnet_id) return - self.agent.queue_event(self.os, event, subnet) + self.agent.queue_event(self.os, event_type, subnet) + + def _handle_subnet_update(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_subnet_update(self, context, message): - event, payload = self._extact_event_payload(message) subnet = payload.get('subnet') - self.agent.queue_event(self.os, event, subnet) + self.agent.queue_event(self.os, event_type, subnet) + + def _handle_subnet_delete(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_subnet_delete(self, context, message): - event, payload = self._extact_event_payload(message) subnet_id = payload.get('subnet_id') - self.agent.queue_event(self.os, event, subnet_id) + self.agent.queue_event(self.os, event_type, subnet_id) + + def _handle_port_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_port_create(self, context, message): - event, payload = self._extact_event_payload(message) port = payload.get('port') port_id = port.get('id') if not utils.is_port_mappable(port): @@ -124,14 +170,22 @@ class Client(neutron_client_bindings.Client): if db_port: LOG.info(_("DB entry for port %s already exists"), port_id) return - self.agent.queue_event(self.os, event, port) + self.agent.queue_event(self.os, event_type, port) + + def _handle_port_update(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_port_update(self, context, message): - event, payload = self._extact_event_payload(message) port = payload.get('port') - self.agent.queue_event(self.os, event, port) + self.agent.queue_event(self.os, event_type, port) + + def _handle_port_delete(self, + context=None, + ctxt=None, + event_type=None, + payload=None): - def _handle_port_delete(self, context, message): - event, payload = self._extact_event_payload(message) port_id = payload.get('port_id') - self.agent.queue_event(self.os, event, port_id) + self.agent.queue_event(self.os, event_type, port_id) diff --git a/neutron-powervc/powervc/neutron/common/constants.py b/neutron-powervc/powervc/neutron/common/constants.py index 854356c..0abefe8 100644 --- a/neutron-powervc/powervc/neutron/common/constants.py +++ b/neutron-powervc/powervc/neutron/common/constants.py @@ -51,7 +51,7 @@ PORT_UPDATE_FIELDS = ['name'] # Qpid message handling QPID_EXCHANGE = 'neutron' -QPID_TOPIC = 'notifications.info' +QPID_TOPIC = 'notifications' EVENT_END_THREAD = 'thread.end' EVENT_FULL_SYNC = 'full.sync' diff --git a/nova-powervc/powervc/nova/driver/compute/manager.py b/nova-powervc/powervc/nova/driver/compute/manager.py index a1cfeae..faa1a78 100644 --- a/nova-powervc/powervc/nova/driver/compute/manager.py +++ b/nova-powervc/powervc/nova/driver/compute/manager.py @@ -36,13 +36,17 @@ from nova.objects import base as obj_base from powervc.nova.driver.compute import computes from powervc.nova.driver.compute import constants from powervc.nova.driver.compute import task_states as pvc_task_states -from powervc.common import messaging from powervc.nova.driver.virt.powervc.sync import flavorsync from powervc import utils from powervc.common import utils as utills from powervc.common.gettextutils import _ from powervc.common.client import delegate as ctx_delegate +from powervc.common import messaging + +from oslo.messaging.notify import listener +from oslo.messaging import target +from oslo.messaging import transport LOG = logging.getLogger(__name__) @@ -916,65 +920,69 @@ class PowerVCCloudManager(manager.Manager): return ['default'] def _create_local_listeners(self, ctx): + """Listen for local(OpenStack) compute node notifications.""" - def reconnect_handler(): - LOG.debug(_('Re-established connection to local Qpid broker')) + LOG.debug("Enter _create_local_listeners method") - # Create Qpid connection and listener - conn = messaging.LocalConnection(reconnect_handler=reconnect_handler, - context=ctx, - log=logging) - listener = conn.create_listener('nova', 'notifications.info') + trans = transport.get_transport(cfg.AMQP_OPENSTACK_CONF) + targets = [ + target.Target(exchange='nova', topic='notifications') + ] + endpoint = messaging.NotificationEndpoint(log=LOG, sec_context=ctx) # Instance state changes - listener.register_handler([ + endpoint.register_handler([ constants.EVENT_INSTANCE_RESIZE, constants.EVENT_INSTANCE_RESIZE_CONFIRM, constants.EVENT_INSTANCE_LIVE_MIGRATE], self._handle_local_deferred_host_updates) # Instance creation - listener.register_handler(constants.EVENT_INSTANCE_CREATE, + endpoint.register_handler(constants.EVENT_INSTANCE_CREATE, self._handle_local_instance_create) + endpoints = [ + endpoint, + ] - conn.start() + LOG.debug("Starting to listen...... ") + + local_nova_listener = listener.\ + get_notification_listener(trans, targets, endpoints, + allow_requeue=False) + messaging.start_notification_listener(local_nova_listener) + + LOG.debug("Exit _create_local_listeners method") def _create_powervc_listeners(self, ctx): - """ - Listen for out-of-band changes made in PowerVC. + """Listen for out-of-band changes made in PowerVC. - This method creates the connection to the PowerVC Qpid broker and - sets up handlers so that any changes made directly in PowerVC are - reflected in the local OS. + Any changes made directly in PowerVC will be reflected in the local OS. :param: ctx The security context """ - # Function to call if we lose the Qpid connection and then get it back - def reconnect_handler(): - LOG.debug(_('Re-established connection to Qpid broker, sync all ' - 'instances on next sync interval')) - self.full_instance_sync_required = True - # Create Qpid connection and listener - conn = messaging.PowerVCConnection(reconnect_handler=reconnect_handler, - context=ctx, - log=logging) - listener = conn.create_listener('nova', 'notifications.info') + LOG.debug("Enter _create_powervc_listeners method") + + trans = transport.get_transport(cfg.AMQP_POWERVC_CONF) + targets = [ + target.Target(exchange='nova', topic='notifications') + ] + endpoint = messaging.NotificationEndpoint(log=LOG, sec_context=ctx) # Instance creation - listener.register_handler(constants.EVENT_INSTANCE_CREATE, + endpoint.register_handler(constants.EVENT_INSTANCE_CREATE, self._handle_powervc_instance_create) # onboarding end - listener.register_handler(constants.EVENT_INSTANCE_IMPORT, + endpoint.register_handler(constants.EVENT_INSTANCE_IMPORT, self._handle_powervc_instance_create) # Instance deletion - listener.register_handler(constants.EVENT_INSTANCE_DELETE, + endpoint.register_handler(constants.EVENT_INSTANCE_DELETE, self._handle_powervc_instance_delete) # Instance state changes - listener.register_handler([ + endpoint.register_handler([ constants.EVENT_INSTANCE_UPDATE, constants.EVENT_INSTANCE_POWER_ON, constants.EVENT_INSTANCE_POWER_OFF, @@ -986,16 +994,30 @@ class PowerVCCloudManager(manager.Manager): self._handle_powervc_instance_state) # Instance volume attach/detach event handling - listener.register_handler([ + endpoint.register_handler([ constants.EVENT_INSTANCE_VOLUME_ATTACH, constants.EVENT_INSTANCE_VOLUME_DETACH], self._handle_volume_attach_or_detach) - conn.start() + endpoints = [ + endpoint, + ] - def _handle_local_instance_create(self, context, message): - """ - Handle local deployment completed messages sent from the + LOG.debug("Starting to listen...... ") + + pvc_nova_listener = listener.\ + get_notification_listener(trans, targets, endpoints, + allow_requeue=False) + messaging.start_notification_listener(pvc_nova_listener) + + LOG.debug("Exit _create_powervc_listeners method") + + def _handle_local_instance_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle local deployment completed messages sent from the hosting OS. This is need so we can tell the hosting OS to sync the latest state from PowerVC. Once a deployment completes in PowerVC the instances go into activating task @@ -1004,11 +1026,10 @@ class PowerVCCloudManager(manager.Manager): back from spawn thus sending the completed event. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - LOG.debug(_("Handling local notification: %s" % - message.get('event_type'))) - payload = message.get('payload') hosting_id = payload.get('instance_id') # Attempt to get the local instance. @@ -1016,7 +1037,7 @@ class PowerVCCloudManager(manager.Manager): try: instance = db.instance_get_by_uuid(context, hosting_id) except exception.InstanceNotFound: - LOG.debug(_("Local Instance %s Not Found" % hosting_id)) + LOG.debug(_("Local Instance %s Not Found") % hosting_id) return # Get the PVC instance @@ -1029,28 +1050,33 @@ class PowerVCCloudManager(manager.Manager): else: LOG.debug(_('PowerVC instance could not be found')) - def _handle_local_deferred_host_updates(self, context, message): - """ - Handle live migration completed messages sent from PowerVC. + def _handle_local_deferred_host_updates(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle live migration completed messages sent from PowerVC. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - hosting_id = self._pre_process_message(message) + hosting_id = self._pre_process_message(payload) # Attempt to get the local instance. instance = None try: instance = db.instance_get_by_uuid(context, hosting_id) except exception.InstanceNotFound: - LOG.debug(_("Local Instance %s Not Found" % hosting_id)) + LOG.debug(_("Local Instance %s Not Found") % hosting_id) return # See if the instance is deferring host scheduling. # If it is exit immediately. if not self.driver._check_defer_placement(instance): - LOG.debug(_("Local Instance %s did not defer scheduling" - % hosting_id)) + LOG.debug(_("Local Instance %s did not defer scheduling") + % hosting_id) return # Get the PVC instance @@ -1064,19 +1090,24 @@ class PowerVCCloudManager(manager.Manager): self.driver.update_instance_host(context, instance) except Exception: LOG.debug(_('Problem updating local instance host ' - 'information, instance: %s' % instance['id'])) + 'information, instance: %s') % instance['id']) else: LOG.debug(_('Tried to update instance host value but the' ' instance could not be found in PowerVC')) - def _handle_powervc_instance_create(self, context, message): - """ - Handle instance create messages sent from PowerVC. + def _handle_powervc_instance_create(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle instance create messages sent from PowerVC. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - powervc_instance_id = self._pre_process_message(message) + powervc_instance_id = self._pre_process_message(payload) # Check for matching local instance matched_instances = self._get_local_instance_by_pvc_id( @@ -1101,20 +1132,25 @@ class PowerVCCloudManager(manager.Manager): try: self._add_local_instance(context, instance) except Exception as e: - LOG.warning(_("Failed to insert instance due to: %s " - % str(e))) + LOG.warning(_("Failed to insert instance due to: %s ") + % str(e)) else: LOG.debug(_('Tried to add newly created instance but it could not ' 'be found in PowerVC')) - def _handle_powervc_instance_delete(self, context, message): - """ - Handle instance delete messages sent from PowerVC. + def _handle_powervc_instance_delete(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle instance delete messages sent from PowerVC. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - powervc_instance_id = self._pre_process_message(message) + powervc_instance_id = self._pre_process_message(payload) # Check for matching local instance matched_instances = self._get_local_instance_by_pvc_id( @@ -1128,17 +1164,21 @@ class PowerVCCloudManager(manager.Manager): # Remove the instance from the local OS self._remove_local_instance(context, matched_instances[0]) - def _handle_powervc_instance_state(self, context, message): - """ - Handle instance state changes sent from PowerVC. This includes + def _handle_powervc_instance_state(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle instance state changes sent from PowerVC. This includes instance update and all other state changes caused by events like power on, power off, resize, live migration, and snapshot. :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - powervc_instance_id = self._pre_process_message(message) - event_type = message.get('event_type') + powervc_instance_id = self._pre_process_message(payload) local_instance = self.\ _get_matched_instance_by_pvc_id(context, powervc_instance_id) @@ -1151,25 +1191,29 @@ class PowerVCCloudManager(manager.Manager): self._update_state(context, local_instance, powervc_instance, powervc_instance_id, event_type) - def _handle_volume_attach_or_detach(self, context, message): - """ - Handle out of band volume attach or detach event + def _handle_volume_attach_or_detach(self, + context=None, + ctxt=None, + event_type=None, + payload=None): + """Handle out of band volume attach or detach event :param: context The security context - :param: message The AMQP message sent from OpenStack (dictionary) + :param: ctxt message context + :param: event_type message event type + :param: payload The AMQP message sent from OpenStack (dictionary) """ - powervc_instance_id = self._pre_process_message(message) + powervc_instance_id = self._pre_process_message(payload) local_instance = self.\ _get_matched_instance_by_pvc_id(context, powervc_instance_id) if not local_instance: return - payload = message.get('payload') powervc_volume_id = payload.get('volume_id') if powervc_volume_id is None: - LOG.warning(_('no valid volume for powervc instance %s' % - powervc_instance_id)) + LOG.warning(_('no valid volume for powervc instance %s') % + powervc_instance_id) return vol_id = self.cache_volume.get_by_id(powervc_volume_id) if vol_id is None: @@ -1190,16 +1234,13 @@ class PowerVCCloudManager(manager.Manager): self.sync_volume_attachment(context, powervc_instance_id, local_instance) - def _pre_process_message(self, message): - """ - Logging the event type and return the instance id of the nova server + def _pre_process_message(self, payload): + """Logging the event type and return the instance id of the nova server instance in the event - :param: message The AMQP message sent from OpenStack (dictionary) + :param: payload The AMQP message sent from OpenStack (dictionary) :returns instance id triggering the event """ - LOG.debug(_("Handling notification: %s" % message.get('event_type'))) - payload = message.get('payload') instance_id = payload.get('instance_id') return instance_id