Transform volume.usage notification

The volume.usage notification has been transformed to
the versioned notification framework.

Change-Id: Ica45a95d26b602f9a149d42516baf4b84fc01cec
Implements: bp versioned-notification-transformation-stein
This commit is contained in:
Takashi NATSUME 2018-07-05 17:25:23 +09:00
parent c5a7002bd5
commit 5859741f4d
12 changed files with 299 additions and 48 deletions

View File

@ -0,0 +1,22 @@
{
"event_type": "volume.usage",
"payload": {
"nova_object.data": {
"availability_zone": "nova",
"instance_uuid": "88fde343-13a8-4047-84fb-2657d5e702f9",
"last_refreshed": "2012-10-29T13:42:11Z",
"project_id": "6f70656e737461636b20342065766572",
"read_bytes": 0,
"reads": 0,
"user_id": "fake",
"volume_id": "a07f71dc-8151-4e7d-a0cc-cd24a3f11113",
"write_bytes": 0,
"writes": 0
},
"nova_object.name": "VolumeUsagePayload",
"nova_object.namespace": "nova",
"nova_object.version": "1.0"
},
"priority": "INFO",
"publisher_id": "nova-compute:compute"
}

View File

@ -5567,8 +5567,8 @@ class ComputeManager(manager.Manager):
vol_usage.curr_writes = wr_req
vol_usage.curr_write_bytes = wr_bytes
vol_usage.save(update_totals=True)
self.notifier.info(context, 'volume.usage',
compute_utils.usage_volume_info(vol_usage))
self.notifier.info(context, 'volume.usage', vol_usage.to_dict())
compute_utils.notify_about_volume_usage(context, vol_usage, self.host)
def _detach_volume(self, context, bdm, instance, destroy_bdm=True,
attachment_id=None):
@ -7441,8 +7441,9 @@ class ComputeManager(manager.Manager):
vol_usage.curr_writes = usage['wr_req']
vol_usage.curr_write_bytes = usage['wr_bytes']
vol_usage.save()
self.notifier.info(context, 'volume.usage',
compute_utils.usage_volume_info(vol_usage))
self.notifier.info(context, 'volume.usage', vol_usage.to_dict())
compute_utils.notify_about_volume_usage(context, vol_usage,
self.host)
@periodic_task.periodic_task(spacing=CONF.volume_usage_poll_interval)
def _poll_volume_usage(self, context):

View File

@ -43,6 +43,7 @@ from nova.notifications.objects import keypair as keypair_notification
from nova.notifications.objects import libvirt as libvirt_notification
from nova.notifications.objects import metrics as metrics_notification
from nova.notifications.objects import server_group as sg_notification
from nova.notifications.objects import volume as volume_notification
from nova import objects
from nova.objects import fields
from nova import rpc
@ -834,6 +835,28 @@ def notify_about_libvirt_connect_error(context, ip, exception, tb):
notification.emit(context)
@rpc.if_notifications_enabled
def notify_about_volume_usage(context, vol_usage, host):
"""Send versioned notification about the volume usage
:param context: the request context
:param vol_usage: the volume usage object
:param host: the host emitting the notification
"""
payload = volume_notification.VolumeUsagePayload(
vol_usage=vol_usage)
notification = volume_notification.VolumeUsageNotification(
context=context,
priority=fields.NotificationPriority.INFO,
publisher=notification_base.NotificationPublisher(
host=host, source=fields.NotificationSource.COMPUTE),
event_type=notification_base.EventType(
object='volume',
action=fields.NotificationAction.USAGE),
payload=payload)
notification.emit(context)
def refresh_info_cache_for_instance(context, instance):
"""Refresh the info cache for an instance.
@ -849,37 +872,6 @@ def refresh_info_cache_for_instance(context, instance):
"was not found", instance=instance)
def usage_volume_info(vol_usage):
def null_safe_str(s):
return str(s) if s else ''
tot_refreshed = vol_usage.tot_last_refreshed
curr_refreshed = vol_usage.curr_last_refreshed
if tot_refreshed and curr_refreshed:
last_refreshed_time = max(tot_refreshed, curr_refreshed)
elif tot_refreshed:
last_refreshed_time = tot_refreshed
else:
# curr_refreshed must be set
last_refreshed_time = curr_refreshed
usage_info = dict(
volume_id=vol_usage.volume_id,
tenant_id=vol_usage.project_id,
user_id=vol_usage.user_id,
availability_zone=vol_usage.availability_zone,
instance_id=vol_usage.instance_uuid,
last_refreshed=null_safe_str(last_refreshed_time),
reads=vol_usage.tot_reads + vol_usage.curr_reads,
read_bytes=vol_usage.tot_read_bytes +
vol_usage.curr_read_bytes,
writes=vol_usage.tot_writes + vol_usage.curr_writes,
write_bytes=vol_usage.tot_write_bytes +
vol_usage.curr_write_bytes)
return usage_info
def get_reboot_type(task_state, current_power_state):
"""Checks if the current instance state requires a HARD reboot."""
if current_power_state != power_state.RUNNING:

View File

@ -66,7 +66,8 @@ class EventType(NotificationObject):
# Version 1.15: LIVE_MIGRATION_FORCE_COMPLETE is added to the
# NotificationActionField enum
# Version 1.16: CONNECT is added to NotificationActionField enum
VERSION = '1.16'
# Version 1.17: USAGE is added to NotificationActionField enum
VERSION = '1.17'
fields = {
'object': fields.StringField(nullable=False),

View File

@ -0,0 +1,64 @@
# Copyright 2018 NTT Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.notifications.objects import base
from nova.objects import base as nova_base
from nova.objects import fields
@base.notification_sample('volume-usage.json')
@nova_base.NovaObjectRegistry.register_notification
class VolumeUsageNotification(base.NotificationBase):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'payload': fields.ObjectField('VolumeUsagePayload')
}
@nova_base.NovaObjectRegistry.register_notification
class VolumeUsagePayload(base.NotificationPayloadBase):
# Version 1.0: Initial version
VERSION = '1.0'
SCHEMA = {
'volume_id': ('vol_usage', 'volume_id'),
'project_id': ('vol_usage', 'project_id'),
'user_id': ('vol_usage', 'user_id'),
'availability_zone': ('vol_usage', 'availability_zone'),
'instance_uuid': ('vol_usage', 'instance_uuid'),
'last_refreshed': ('vol_usage', 'last_refreshed'),
'reads': ('vol_usage', 'reads'),
'read_bytes': ('vol_usage', 'read_bytes'),
'writes': ('vol_usage', 'writes'),
'write_bytes': ('vol_usage', 'write_bytes')
}
fields = {
'volume_id': fields.UUIDField(),
'project_id': fields.StringField(nullable=True),
'user_id': fields.StringField(nullable=True),
'availability_zone': fields.StringField(nullable=True),
'instance_uuid': fields.UUIDField(nullable=True),
'last_refreshed': fields.DateTimeField(nullable=True),
'reads': fields.IntegerField(),
'read_bytes': fields.IntegerField(),
'writes': fields.IntegerField(),
'write_bytes': fields.IntegerField()
}
def __init__(self, vol_usage):
super(VolumeUsagePayload, self).__init__()
self.populate_schema(vol_usage=vol_usage)

View File

@ -832,6 +832,7 @@ class NotificationAction(BaseNovaEnum):
UNLOCK = 'unlock'
UPDATE_PROP = 'update_prop'
CONNECT = 'connect'
USAGE = 'usage'
ALL = (UPDATE, EXCEPTION, DELETE, PAUSE, UNPAUSE, RESIZE, VOLUME_SWAP,
SUSPEND, POWER_ON, REBOOT, SHUTDOWN, SNAPSHOT, INTERFACE_ATTACH,
@ -844,7 +845,7 @@ class NotificationAction(BaseNovaEnum):
SOFT_DELETE, TRIGGER_CRASH_DUMP, UNRESCUE, UNSHELVE, ADD_HOST,
REMOVE_HOST, ADD_MEMBER, UPDATE_METADATA, LOCK, UNLOCK,
REBUILD_SCHEDULED, UPDATE_PROP, LIVE_MIGRATION_FORCE_COMPLETE,
CONNECT)
CONNECT, USAGE)
# TODO(rlrossit): These should be changed over to be a StateMachine enum from

View File

@ -41,6 +41,32 @@ class VolumeUsage(base.NovaPersistentObject, base.NovaObject):
'curr_write_bytes': fields.IntegerField()
}
@property
def last_refreshed(self):
if self.tot_last_refreshed and self.curr_last_refreshed:
return max(self.tot_last_refreshed, self.curr_last_refreshed)
elif self.tot_last_refreshed:
return self.tot_last_refreshed
else:
# curr_last_refreshed must be set
return self.curr_last_refreshed
@property
def reads(self):
return self.tot_reads + self.curr_reads
@property
def read_bytes(self):
return self.tot_read_bytes + self.curr_read_bytes
@property
def writes(self):
return self.tot_writes + self.curr_writes
@property
def write_bytes(self):
return self.tot_write_bytes + self.curr_write_bytes
@staticmethod
def _from_db_object(context, vol_usage, db_vol_usage):
for field in vol_usage.fields:
@ -57,3 +83,18 @@ class VolumeUsage(base.NovaPersistentObject, base.NovaObject):
self.instance_uuid, self.project_id, self.user_id,
self.availability_zone, update_totals=update_totals)
self._from_db_object(self._context, self, db_vol_usage)
def to_dict(self):
return {
'volume_id': self.volume_id,
'tenant_id': self.project_id,
'user_id': self.user_id,
'availability_zone': self.availability_zone,
'instance_id': self.instance_uuid,
'last_refreshed': str(
self.last_refreshed) if self.last_refreshed else '',
'reads': self.reads,
'read_bytes': self.read_bytes,
'writes': self.writes,
'write_bytes': self.write_bytes
}

View File

@ -0,0 +1,66 @@
# Copyright 2018 NTT Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import context
from nova.tests import fixtures
from nova.tests.functional.notification_sample_tests \
import notification_sample_base
from nova.tests.unit import fake_notifier
class TestVolumeUsageNotificationSample(
notification_sample_base.NotificationSampleTestBase):
def setUp(self):
self.flags(use_neutron=True)
self.flags(volume_usage_poll_interval=60)
super(TestVolumeUsageNotificationSample, self).setUp()
self.neutron = fixtures.NeutronFixture(self)
self.useFixture(self.neutron)
self.cinder = fixtures.CinderFixtureNewAttachFlow(self)
self.useFixture(self.cinder)
def _setup_server_with_volume_attached(self):
server = self._boot_a_server(
extra_params={'networks': [{'port': self.neutron.port_1['id']}]})
self._attach_volume_to_server(server, self.cinder.SWAP_OLD_VOL)
fake_notifier.reset()
return server
def test_volume_usage_with_detaching_volume(self):
server = self._setup_server_with_volume_attached()
self.api.delete_server_volume(server['id'],
self.cinder.SWAP_OLD_VOL)
self._wait_for_notification('instance.volume_detach.end')
# 0. volume_detach-start
# 1. volume.usage
# 2. volume_detach-end
self.assertEqual(3, len(fake_notifier.VERSIONED_NOTIFICATIONS))
self._verify_notification(
'volume-usage',
replacements={'instance_uuid': server['id']},
actual=fake_notifier.VERSIONED_NOTIFICATIONS[1])
def test_instance_poll_volume_usage(self):
server = self._setup_server_with_volume_attached()
self.compute.manager._poll_volume_usage(context.get_admin_context())
self.assertEqual(1, len(fake_notifier.VERSIONED_NOTIFICATIONS))
self._verify_notification(
'volume-usage',
replacements={'instance_uuid': server['id']},
actual=fake_notifier.VERSIONED_NOTIFICATIONS[0])

View File

@ -799,23 +799,30 @@ class ComputeVolumeTestCase(BaseTestCase):
mock_get_bdms.assert_called_once_with(ctxt, use_slave=True)
@mock.patch.object(compute_utils, 'notify_about_volume_usage')
@mock.patch.object(compute_manager.ComputeManager, '_get_host_volume_bdms')
@mock.patch.object(compute_manager.ComputeManager,
'_update_volume_usage_cache')
@mock.patch.object(fake.FakeDriver, 'get_all_volume_usage')
def test_poll_volume_usage_with_data(self, mock_get_usage, mock_update,
mock_get_bdms):
ctxt = 'MockContext'
mock_get_usage.side_effect = lambda x, y: [3, 4]
def test_poll_volume_usage_with_data(self, mock_get_usage, mock_get_bdms,
mock_notify):
# All the mocks are called
mock_get_bdms.return_value = [1, 2]
mock_get_usage.return_value = [
{'volume': uuids.volume,
'instance': self.instance_object,
'rd_req': 100,
'rd_bytes': 500,
'wr_req': 25,
'wr_bytes': 75}]
self.flags(volume_usage_poll_interval=10)
self.compute._poll_volume_usage(ctxt)
self.compute._poll_volume_usage(self.context)
mock_get_bdms.assert_called_once_with(ctxt, use_slave=True)
mock_update.assert_called_once_with(ctxt, [3, 4])
mock_get_bdms.assert_called_once_with(self.context, use_slave=True)
mock_notify.assert_called_once_with(
self.context, test.MatchType(objects.VolumeUsage),
self.compute.host)
@mock.patch.object(compute_utils, 'notify_about_volume_usage')
@mock.patch('nova.context.RequestContext.elevated')
@mock.patch('nova.compute.utils.notify_about_volume_attach_detach')
@mock.patch.object(objects.BlockDeviceMapping,
@ -826,7 +833,7 @@ class ComputeVolumeTestCase(BaseTestCase):
@mock.patch.object(fake.FakeDriver, 'instance_exists')
def test_detach_volume_usage(self, mock_exists, mock_get_all,
mock_get_bdms, mock_stats, mock_get,
mock_notify, mock_elevate):
mock_notify, mock_elevate, mock_notify_usage):
mock_elevate.return_value = self.context
# Test that detach volume update the volume usage cache table correctly
instance = self._create_fake_instance_obj()
@ -891,6 +898,12 @@ class ComputeVolumeTestCase(BaseTestCase):
self.assertIsNone(payload['availability_zone'])
msg = fake_notifier.NOTIFICATIONS[3]
self.assertEqual('compute.instance.volume.detach', msg.event_type)
mock_notify_usage.assert_has_calls([
mock.call(self.context, test.MatchType(objects.VolumeUsage),
self.compute.host),
mock.call(self.context, test.MatchType(objects.VolumeUsage),
self.compute.host)])
self.assertEqual(2, mock_notify_usage.call_count)
# Check the database for the
volume_usages = db.vol_get_usage_by_time(self.context, 0)

View File

@ -17,6 +17,7 @@
"""Tests For miscellaneous util methods used with compute."""
import copy
import datetime
import string
import traceback
@ -879,6 +880,44 @@ class UsageInfoTestCase(test.TestCase):
glance.generate_glance_url(self.context), uuids.fake_image_ref)
self.assertEqual(payload['image_ref_url'], image_ref_url)
def test_notify_about_volume_usage(self):
# Ensure 'volume.usage' notification generates appropriate usage data.
vol_usage = objects.VolumeUsage(
id=1, volume_id=uuids.volume, instance_uuid=uuids.instance,
project_id=self.project_id, user_id=self.user_id,
availability_zone='AZ1',
tot_last_refreshed=datetime.datetime(second=1, minute=1, hour=1,
day=5, month=7, year=2018),
tot_reads=100, tot_read_bytes=100,
tot_writes=100, tot_write_bytes=100,
curr_last_refreshed=datetime.datetime(second=1, minute=1, hour=2,
day=5, month=7, year=2018),
curr_reads=100, curr_read_bytes=100,
curr_writes=100, curr_write_bytes=100)
compute_utils.notify_about_volume_usage(self.context, vol_usage,
'fake-compute')
self.assertEqual(1, len(fake_notifier.VERSIONED_NOTIFICATIONS))
notification = fake_notifier.VERSIONED_NOTIFICATIONS[0]
self.assertEqual('INFO', notification['priority'])
self.assertEqual('volume.usage', notification['event_type'])
self.assertEqual('nova-compute:fake-compute',
notification['publisher_id'])
payload = notification['payload']['nova_object.data']
self.assertEqual(uuids.volume, payload['volume_id'])
self.assertEqual(uuids.instance, payload['instance_uuid'])
self.assertEqual(self.project_id, payload['project_id'])
self.assertEqual(self.user_id, payload['user_id'])
self.assertEqual('AZ1', payload['availability_zone'])
self.assertEqual('2018-07-05T02:01:01Z', payload['last_refreshed'])
self.assertEqual(200, payload['read_bytes'])
self.assertEqual(200, payload['reads'])
self.assertEqual(200, payload['write_bytes'])
self.assertEqual(200, payload['writes'])
def test_notify_about_instance_usage(self):
instance = create_instance(self.context)
# Set some system metadata

View File

@ -370,7 +370,7 @@ notification_object_data = {
'AuditPeriodPayload': '1.0-2b429dd307b8374636703b843fa3f9cb',
'BandwidthPayload': '1.0-ee2616a7690ab78406842a2b68e34130',
'BlockDevicePayload': '1.0-29751e1b6d41b1454e36768a1e764df8',
'EventType': '1.16-0da423d66218567962410921f2542c41',
'EventType': '1.17-242397275522a04130b3af4c0ea926e2',
'ExceptionNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
'ExceptionPayload': '1.1-6c43008bd81885a63bc7f7c629f0793b',
'FlavorNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
@ -414,6 +414,8 @@ notification_object_data = {
'ServerGroupPayload': '1.1-4ded2997ea1b07038f7af33ef5c45f7f',
'ServiceStatusNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
'ServiceStatusPayload': '1.1-7b6856bd879db7f3ecbcd0ca9f35f92f',
'VolumeUsageNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
'VolumeUsagePayload': '1.0-5f99d8b978a32040eecac0975e5a53e9',
}

View File

@ -409,6 +409,15 @@ class FakeDriver(driver.ComputeDriver):
a given host.
"""
volusage = []
if compute_host_bdms:
volusage = [{'volume': compute_host_bdms[0][
'instance_bdms'][0]['volume_id'],
'instance': compute_host_bdms[0]['instance'],
'rd_bytes': 0,
'rd_req': 0,
'wr_bytes': 0,
'wr_req': 0}]
return volusage
def get_host_cpu_stats(self):