Merge "Fix instance update notification publisher id"
This commit is contained in:
@@ -607,7 +607,7 @@ class API(base.Base):
|
|||||||
# send a state update notification for the initial create to
|
# send a state update notification for the initial create to
|
||||||
# show it going from non-existent to BUILDING
|
# show it going from non-existent to BUILDING
|
||||||
notifications.send_update_with_states(context, instance, None,
|
notifications.send_update_with_states(context, instance, None,
|
||||||
vm_states.BUILDING, None, None)
|
vm_states.BUILDING, None, None, service="api")
|
||||||
|
|
||||||
for security_group_id in security_groups:
|
for security_group_id in security_groups:
|
||||||
self.db.instance_add_security_group(elevated,
|
self.db.instance_add_security_group(elevated,
|
||||||
@@ -934,7 +934,8 @@ class API(base.Base):
|
|||||||
# if task or vm state changed
|
# if task or vm state changed
|
||||||
old_ref, instance_ref = self.db.instance_update_and_get_original(
|
old_ref, instance_ref = self.db.instance_update_and_get_original(
|
||||||
context, instance["id"], kwargs)
|
context, instance["id"], kwargs)
|
||||||
notifications.send_update(context, old_ref, instance_ref)
|
notifications.send_update(context, old_ref, instance_ref,
|
||||||
|
service="api")
|
||||||
|
|
||||||
return dict(instance_ref.iteritems())
|
return dict(instance_ref.iteritems())
|
||||||
|
|
||||||
@@ -1284,7 +1285,8 @@ class API(base.Base):
|
|||||||
context, instance_uuid, 'task_state', [None], task_state)
|
context, instance_uuid, 'task_state', [None], task_state)
|
||||||
|
|
||||||
notifications.send_update_with_states(context, instance, old_vm_state,
|
notifications.send_update_with_states(context, instance, old_vm_state,
|
||||||
instance["vm_state"], old_task_state, instance["task_state"])
|
instance["vm_state"], old_task_state, instance["task_state"],
|
||||||
|
service="api")
|
||||||
|
|
||||||
properties = {
|
properties = {
|
||||||
'instance_uuid': instance_uuid,
|
'instance_uuid': instance_uuid,
|
||||||
|
|||||||
@@ -43,18 +43,18 @@ FLAGS = flags.FLAGS
|
|||||||
FLAGS.register_opt(notify_state_opt)
|
FLAGS.register_opt(notify_state_opt)
|
||||||
|
|
||||||
|
|
||||||
def send_update(context, old_instance, new_instance, host=None):
|
def send_update(context, old_instance, new_instance, service=None, host=None):
|
||||||
"""Send compute.instance.update notification to report changes
|
"""Send compute.instance.update notification to report changes
|
||||||
in vm state and (optionally) task state
|
in vm state and (optionally) task state
|
||||||
"""
|
"""
|
||||||
|
|
||||||
send_update_with_states(context, new_instance, old_instance["vm_state"],
|
send_update_with_states(context, new_instance, old_instance["vm_state"],
|
||||||
new_instance["vm_state"], old_instance["task_state"],
|
new_instance["vm_state"], old_instance["task_state"],
|
||||||
new_instance["task_state"], host)
|
new_instance["task_state"], service, host)
|
||||||
|
|
||||||
|
|
||||||
def send_update_with_states(context, instance, old_vm_state, new_vm_state,
|
def send_update_with_states(context, instance, old_vm_state, new_vm_state,
|
||||||
old_task_state, new_task_state, host=None):
|
old_task_state, new_task_state, service=None, host=None):
|
||||||
"""Send compute.instance.update notification to report changes
|
"""Send compute.instance.update notification to report changes
|
||||||
in vm state and (optionally) task state
|
in vm state and (optionally) task state
|
||||||
"""
|
"""
|
||||||
@@ -76,14 +76,15 @@ def send_update_with_states(context, instance, old_vm_state, new_vm_state,
|
|||||||
if fire_update:
|
if fire_update:
|
||||||
try:
|
try:
|
||||||
_send_instance_update_notification(context, instance, old_vm_state,
|
_send_instance_update_notification(context, instance, old_vm_state,
|
||||||
old_task_state, new_vm_state, new_task_state, host)
|
old_task_state, new_vm_state, new_task_state, service,
|
||||||
|
host)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("Failed to send state update notification"),
|
LOG.exception(_("Failed to send state update notification"),
|
||||||
instance=instance)
|
instance=instance)
|
||||||
|
|
||||||
|
|
||||||
def _send_instance_update_notification(context, instance, old_vm_state,
|
def _send_instance_update_notification(context, instance, old_vm_state,
|
||||||
old_task_state, new_vm_state, new_task_state, host=None):
|
old_task_state, new_vm_state, new_task_state, service=None, host=None):
|
||||||
"""Send 'compute.instance.exists' notification to inform observers
|
"""Send 'compute.instance.exists' notification to inform observers
|
||||||
about instance state changes"""
|
about instance state changes"""
|
||||||
|
|
||||||
@@ -117,10 +118,14 @@ def _send_instance_update_notification(context, instance, old_vm_state,
|
|||||||
image_meta_props = image_meta(system_metadata)
|
image_meta_props = image_meta(system_metadata)
|
||||||
payload["image_meta"] = image_meta_props
|
payload["image_meta"] = image_meta_props
|
||||||
|
|
||||||
if not host:
|
# if the service name (e.g. api/scheduler/compute) is not provided, default
|
||||||
host = FLAGS.host
|
# to "compute"
|
||||||
|
if not service:
|
||||||
|
service = "compute"
|
||||||
|
|
||||||
notifier_api.notify(context, host, 'compute.instance.update',
|
publisher_id = notifier_api.publisher_id(service, host)
|
||||||
|
|
||||||
|
notifier_api.notify(context, publisher_id, 'compute.instance.update',
|
||||||
notifier_api.INFO, payload)
|
notifier_api.INFO, payload)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -233,7 +233,8 @@ class Scheduler(object):
|
|||||||
# update instance state and notify
|
# update instance state and notify
|
||||||
(old_ref, new_instance_ref) = db.instance_update_and_get_original(
|
(old_ref, new_instance_ref) = db.instance_update_and_get_original(
|
||||||
context, instance_id, values)
|
context, instance_id, values)
|
||||||
notifications.send_update(context, old_ref, new_instance_ref)
|
notifications.send_update(context, old_ref, new_instance_ref,
|
||||||
|
service="scheduler")
|
||||||
|
|
||||||
src = instance_ref['host']
|
src = instance_ref['host']
|
||||||
cast_to_compute_host(context, src, 'live_migration',
|
cast_to_compute_host(context, src, 'live_migration',
|
||||||
|
|||||||
@@ -178,7 +178,8 @@ class SchedulerManager(manager.Manager):
|
|||||||
# update instance state and notify on the transition
|
# update instance state and notify on the transition
|
||||||
(old_ref, new_ref) = db.instance_update_and_get_original(context,
|
(old_ref, new_ref) = db.instance_update_and_get_original(context,
|
||||||
instance_uuid, updates)
|
instance_uuid, updates)
|
||||||
notifications.send_update(context, old_ref, new_ref)
|
notifications.send_update(context, old_ref, new_ref,
|
||||||
|
service="scheduler")
|
||||||
|
|
||||||
payload = dict(request_spec=request_spec,
|
payload = dict(request_spec=request_spec,
|
||||||
instance_properties=properties,
|
instance_properties=properties,
|
||||||
|
|||||||
@@ -54,7 +54,8 @@ class NotificationsTestCase(test.TestCase):
|
|||||||
stub_network=True,
|
stub_network=True,
|
||||||
notification_driver='nova.notifier.test_notifier',
|
notification_driver='nova.notifier.test_notifier',
|
||||||
network_manager='nova.network.manager.FlatManager',
|
network_manager='nova.network.manager.FlatManager',
|
||||||
notify_on_state_change="vm_and_task_state")
|
notify_on_state_change="vm_and_task_state",
|
||||||
|
host='testhost')
|
||||||
|
|
||||||
self.user_id = 'fake'
|
self.user_id = 'fake'
|
||||||
self.project_id = 'fake'
|
self.project_id = 'fake'
|
||||||
@@ -167,3 +168,33 @@ class NotificationsTestCase(test.TestCase):
|
|||||||
self.assertEquals(vm_states.BUILDING, payload["state"])
|
self.assertEquals(vm_states.BUILDING, payload["state"])
|
||||||
self.assertEquals(task_states.SPAWNING, payload["old_task_state"])
|
self.assertEquals(task_states.SPAWNING, payload["old_task_state"])
|
||||||
self.assertEquals(None, payload["new_task_state"])
|
self.assertEquals(None, payload["new_task_state"])
|
||||||
|
|
||||||
|
def test_update_no_service_name(self):
|
||||||
|
notifications.send_update_with_states(self.context, self.instance,
|
||||||
|
vm_states.BUILDING, vm_states.BUILDING, task_states.SPAWNING,
|
||||||
|
None)
|
||||||
|
self.assertEquals(1, len(test_notifier.NOTIFICATIONS))
|
||||||
|
|
||||||
|
# service name should default to 'compute'
|
||||||
|
notif = test_notifier.NOTIFICATIONS[0]
|
||||||
|
self.assertEquals('compute.testhost', notif['publisher_id'])
|
||||||
|
|
||||||
|
def test_update_with_service_name(self):
|
||||||
|
notifications.send_update_with_states(self.context, self.instance,
|
||||||
|
vm_states.BUILDING, vm_states.BUILDING, task_states.SPAWNING,
|
||||||
|
None, service="testservice")
|
||||||
|
self.assertEquals(1, len(test_notifier.NOTIFICATIONS))
|
||||||
|
|
||||||
|
# service name should default to 'compute'
|
||||||
|
notif = test_notifier.NOTIFICATIONS[0]
|
||||||
|
self.assertEquals('testservice.testhost', notif['publisher_id'])
|
||||||
|
|
||||||
|
def test_update_with_host_name(self):
|
||||||
|
notifications.send_update_with_states(self.context, self.instance,
|
||||||
|
vm_states.BUILDING, vm_states.BUILDING, task_states.SPAWNING,
|
||||||
|
None, host="someotherhost")
|
||||||
|
self.assertEquals(1, len(test_notifier.NOTIFICATIONS))
|
||||||
|
|
||||||
|
# service name should default to 'compute'
|
||||||
|
notif = test_notifier.NOTIFICATIONS[0]
|
||||||
|
self.assertEquals('compute.someotherhost', notif['publisher_id'])
|
||||||
|
|||||||
Reference in New Issue
Block a user