Merge "Scheduler notifications added."
This commit is contained in:
		@@ -21,9 +21,9 @@ from nova.openstack.common import cfg
 | 
			
		||||
from nova import rpc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
notification_topic_opt = cfg.StrOpt('notification_topic',
 | 
			
		||||
        default='notifications',
 | 
			
		||||
        help='RabbitMQ topic used for Nova notifications')
 | 
			
		||||
notification_topic_opt = cfg.ListOpt('notification_topics',
 | 
			
		||||
        default=['notifications', ],
 | 
			
		||||
        help='AMQP topic used for Nova notifications')
 | 
			
		||||
 | 
			
		||||
FLAGS = flags.FLAGS
 | 
			
		||||
FLAGS.register_opt(notification_topic_opt)
 | 
			
		||||
@@ -35,5 +35,10 @@ def notify(message):
 | 
			
		||||
    priority = message.get('priority',
 | 
			
		||||
                           FLAGS.default_notification_level)
 | 
			
		||||
    priority = priority.lower()
 | 
			
		||||
    topic = '%s.%s' % (FLAGS.notification_topic, priority)
 | 
			
		||||
    rpc.notify(context, topic, message)
 | 
			
		||||
    for topic in FLAGS.notification_topics:
 | 
			
		||||
        topic = '%s.%s' % (topic, priority)
 | 
			
		||||
        try:
 | 
			
		||||
            rpc.notify(context, topic, message)
 | 
			
		||||
        except Exception, e:
 | 
			
		||||
            LOG.exception(_("Could not send notification to %(topic)s. "
 | 
			
		||||
                            "Payload=%(message)s" % locals()))
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,7 @@ import operator
 | 
			
		||||
from nova import exception
 | 
			
		||||
from nova import flags
 | 
			
		||||
from nova import log as logging
 | 
			
		||||
from nova.notifier import api as notifier
 | 
			
		||||
from nova.scheduler import driver
 | 
			
		||||
from nova.scheduler import least_cost
 | 
			
		||||
from nova.scheduler import scheduler_options
 | 
			
		||||
@@ -63,6 +64,10 @@ class DistributedScheduler(driver.Scheduler):
 | 
			
		||||
        LOG.debug(_("Attempting to build %(num_instances)d instance(s)") %
 | 
			
		||||
                locals())
 | 
			
		||||
 | 
			
		||||
        payload = dict(request_spec=request_spec)
 | 
			
		||||
        notifier.notify(notifier.publisher_id("scheduler"),
 | 
			
		||||
                        'scheduler.run_instance.start', notifier.INFO, payload)
 | 
			
		||||
 | 
			
		||||
        weighted_hosts = self._schedule(context, "compute", request_spec,
 | 
			
		||||
                                        *args, **kwargs)
 | 
			
		||||
 | 
			
		||||
@@ -85,6 +90,9 @@ class DistributedScheduler(driver.Scheduler):
 | 
			
		||||
            if instance:
 | 
			
		||||
                instances.append(instance)
 | 
			
		||||
 | 
			
		||||
        notifier.notify(notifier.publisher_id("scheduler"),
 | 
			
		||||
                        'scheduler.run_instance.end', notifier.INFO, payload)
 | 
			
		||||
 | 
			
		||||
        return instances
 | 
			
		||||
 | 
			
		||||
    def schedule_prep_resize(self, context, request_spec, *args, **kwargs):
 | 
			
		||||
@@ -112,6 +120,14 @@ class DistributedScheduler(driver.Scheduler):
 | 
			
		||||
            kwargs):
 | 
			
		||||
        """Create the requested resource in this Zone."""
 | 
			
		||||
        instance = self.create_instance_db_entry(context, request_spec)
 | 
			
		||||
 | 
			
		||||
        payload = dict(request_spec=request_spec,
 | 
			
		||||
                       weighted_host=weighted_host.to_dict(),
 | 
			
		||||
                       instance_id=instance['uuid'])
 | 
			
		||||
        notifier.notify(notifier.publisher_id("scheduler"),
 | 
			
		||||
                        'scheduler.run_instance.scheduled', notifier.INFO,
 | 
			
		||||
                        payload)
 | 
			
		||||
 | 
			
		||||
        driver.cast_to_compute_host(context, weighted_host.host_state.host,
 | 
			
		||||
                'run_instance', instance_uuid=instance['uuid'], **kwargs)
 | 
			
		||||
        inst = driver.encode_instance(instance, local=True)
 | 
			
		||||
@@ -163,6 +179,10 @@ class DistributedScheduler(driver.Scheduler):
 | 
			
		||||
        # unfiltered_hosts_dict is {host : ZoneManager.HostInfo()}
 | 
			
		||||
        unfiltered_hosts_dict = self.host_manager.get_all_host_states(
 | 
			
		||||
                elevated, topic)
 | 
			
		||||
 | 
			
		||||
        # Note: remember, we are using an iterator here. So only
 | 
			
		||||
        # traverse this list once. This can bite you if the hosts
 | 
			
		||||
        # are being scanned in a filter or weighing function.
 | 
			
		||||
        hosts = unfiltered_hosts_dict.itervalues()
 | 
			
		||||
 | 
			
		||||
        num_instances = request_spec.get('num_instances', 1)
 | 
			
		||||
 
 | 
			
		||||
@@ -59,6 +59,14 @@ class WeightedHost(object):
 | 
			
		||||
        self.weight = weight
 | 
			
		||||
        self.host_state = host_state
 | 
			
		||||
 | 
			
		||||
    def to_dict(self):
 | 
			
		||||
        x = dict(weight=self.weight)
 | 
			
		||||
        if self.host_state:
 | 
			
		||||
            x['host'] = self.host_state.host
 | 
			
		||||
        if self.zone:
 | 
			
		||||
            x['zone'] = self.zone
 | 
			
		||||
        return x
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def noop_cost_fn(host_state, weighing_properties):
 | 
			
		||||
    """Return a pre-weight cost of 1 for each host"""
 | 
			
		||||
 
 | 
			
		||||
@@ -88,8 +88,8 @@ class NotifierTestCase(test.TestCase):
 | 
			
		||||
    def test_rabbit_priority_queue(self):
 | 
			
		||||
        self.stubs.Set(nova.flags.FLAGS, 'notification_driver',
 | 
			
		||||
                'nova.notifier.rabbit_notifier')
 | 
			
		||||
        self.stubs.Set(nova.flags.FLAGS, 'notification_topic',
 | 
			
		||||
                'testnotify')
 | 
			
		||||
        self.stubs.Set(nova.flags.FLAGS, 'notification_topics',
 | 
			
		||||
                       ['testnotify', ])
 | 
			
		||||
 | 
			
		||||
        self.test_topic = None
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user