Sending volume IO usage broken
This is my fault, I didn't test the sending of notifications enough when fixing bug 1182102, when the volume IO usage notifications were been sent too often. This was merged as part of https://review.openstack.org/#/c/29915 The issue was that the code worked for the first IO usage event (the one I looked at). Then when constructing the event payload for the second and following IO usage events the code path ended up comparing a datetime object from the db with a cached string representation of the last_refreshed date that was passed into the conductor via the RPC. Since last_refreshed argument to vol_usage_update in the db layer is not needed and is causing these issues. I have removed last_refreshed argument from the db layer and deprecated it from the conduction API. Change-Id: I2030eb7912c56134ea688a6e8bbfcdeddca28307
This commit is contained in:
@@ -3782,7 +3782,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
|
||||
return compute_host_bdms
|
||||
|
||||
def _update_volume_usage_cache(self, context, vol_usages, refreshed):
|
||||
def _update_volume_usage_cache(self, context, vol_usages):
|
||||
"""Updates the volume usage cache table with a list of stats."""
|
||||
for usage in vol_usages:
|
||||
# Allow switching of greenthreads between queries.
|
||||
@@ -3792,8 +3792,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
usage['rd_bytes'],
|
||||
usage['wr_req'],
|
||||
usage['wr_bytes'],
|
||||
usage['instance'],
|
||||
last_refreshed=refreshed)
|
||||
usage['instance'])
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def _poll_volume_usage(self, context, start_time=None):
|
||||
@@ -3820,8 +3819,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
except NotImplementedError:
|
||||
return
|
||||
|
||||
refreshed = timeutils.utcnow()
|
||||
self._update_volume_usage_cache(context, vol_usages, refreshed)
|
||||
self._update_volume_usage_cache(context, vol_usages)
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def _report_driver_status(self, context):
|
||||
|
||||
@@ -290,8 +290,13 @@ def usage_volume_info(vol_usage):
|
||||
|
||||
tot_refreshed = vol_usage['tot_last_refreshed']
|
||||
curr_refreshed = vol_usage['curr_last_refreshed']
|
||||
last_refreshed_time = (tot_refreshed if tot_refreshed > curr_refreshed
|
||||
else curr_refreshed)
|
||||
if tot_refreshed and curr_refreshed:
|
||||
last_refreshed_time = max(tot_refreshed, curr_refreshed)
|
||||
elif tot_refreshed:
|
||||
last_refreshed_time = tot_refreshed
|
||||
else:
|
||||
# curr_refreshed must be set
|
||||
last_refreshed_time = curr_refreshed
|
||||
|
||||
usage_info = dict(
|
||||
volume_id=vol_usage['volume_id'],
|
||||
|
||||
@@ -334,6 +334,8 @@ class ConductorManager(manager.Manager):
|
||||
result = self.db.vol_get_usage_by_time(context, start_time)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
||||
# NOTE(kerrin): The last_refreshed argument is unused by this method
|
||||
# and can be removed in v2.0 of the RPC API.
|
||||
def vol_usage_update(self, context, vol_id, rd_req, rd_bytes, wr_req,
|
||||
wr_bytes, instance, last_refreshed=None,
|
||||
update_totals=False):
|
||||
@@ -347,7 +349,7 @@ class ConductorManager(manager.Manager):
|
||||
instance['project_id'],
|
||||
instance['user_id'],
|
||||
instance['availability_zone'],
|
||||
last_refreshed, update_totals,
|
||||
update_totals,
|
||||
session)
|
||||
|
||||
# We have just updated the database, so send the notification now
|
||||
|
||||
@@ -1469,14 +1469,12 @@ def vol_get_usage_by_time(context, begin):
|
||||
|
||||
def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
|
||||
instance_id, project_id, user_id, availability_zone,
|
||||
last_refreshed=None, update_totals=False,
|
||||
session=None):
|
||||
update_totals=False, session=None):
|
||||
"""Update cached volume usage for a volume
|
||||
Creates new record if needed."""
|
||||
return IMPL.vol_usage_update(context, id, rd_req, rd_bytes, wr_req,
|
||||
wr_bytes, instance_id, project_id, user_id,
|
||||
availability_zone,
|
||||
last_refreshed=last_refreshed,
|
||||
update_totals=update_totals,
|
||||
session=session)
|
||||
|
||||
|
||||
@@ -4392,19 +4392,18 @@ def vol_get_usage_by_time(context, begin):
|
||||
@require_context
|
||||
def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
|
||||
instance_id, project_id, user_id, availability_zone,
|
||||
last_refreshed=None, update_totals=False, session=None):
|
||||
update_totals=False, session=None):
|
||||
if not session:
|
||||
session = get_session()
|
||||
|
||||
if last_refreshed is None:
|
||||
last_refreshed = timeutils.utcnow()
|
||||
refreshed = timeutils.utcnow()
|
||||
|
||||
with session.begin():
|
||||
values = {}
|
||||
# NOTE(dricco): We will be mostly updating current usage records vs
|
||||
# updating total or creating records. Optimize accordingly.
|
||||
if not update_totals:
|
||||
values = {'curr_last_refreshed': last_refreshed,
|
||||
values = {'curr_last_refreshed': refreshed,
|
||||
'curr_reads': rd_req,
|
||||
'curr_read_bytes': rd_bytes,
|
||||
'curr_writes': wr_req,
|
||||
@@ -4414,7 +4413,7 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
|
||||
'user_id': user_id,
|
||||
'availability_zone': availability_zone}
|
||||
else:
|
||||
values = {'tot_last_refreshed': last_refreshed,
|
||||
values = {'tot_last_refreshed': refreshed,
|
||||
'tot_reads': models.VolumeUsage.tot_reads + rd_req,
|
||||
'tot_read_bytes': models.VolumeUsage.tot_read_bytes +
|
||||
rd_bytes,
|
||||
@@ -4443,7 +4442,6 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
|
||||
"the database. Instance must have been rebooted "
|
||||
"or crashed. Updating totals.") % id)
|
||||
if not update_totals:
|
||||
values['tot_last_refreshed'] = last_refreshed
|
||||
values['tot_reads'] = (models.VolumeUsage.tot_reads +
|
||||
current_usage['curr_reads'])
|
||||
values['tot_read_bytes'] = (
|
||||
@@ -4472,8 +4470,6 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
|
||||
return current_usage
|
||||
|
||||
vol_usage = models.VolumeUsage()
|
||||
vol_usage.tot_last_refreshed = timeutils.utcnow()
|
||||
vol_usage.curr_last_refreshed = timeutils.utcnow()
|
||||
vol_usage.volume_id = id
|
||||
vol_usage.instance_uuid = instance_id
|
||||
vol_usage.project_id = project_id
|
||||
@@ -4481,11 +4477,13 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
|
||||
vol_usage.availability_zone = availability_zone
|
||||
|
||||
if not update_totals:
|
||||
vol_usage.curr_last_refreshed = refreshed
|
||||
vol_usage.curr_reads = rd_req
|
||||
vol_usage.curr_read_bytes = rd_bytes
|
||||
vol_usage.curr_writes = wr_req
|
||||
vol_usage.curr_write_bytes = wr_bytes
|
||||
else:
|
||||
vol_usage.tot_last_refreshed = refreshed
|
||||
vol_usage.tot_reads = rd_req
|
||||
vol_usage.tot_read_bytes = rd_bytes
|
||||
vol_usage.tot_writes = wr_req
|
||||
|
||||
@@ -427,15 +427,13 @@ class ComputeVolumeTestCase(BaseTestCase):
|
||||
curr_time = time.time()
|
||||
self.mox.StubOutWithMock(utils, 'last_completed_audit_period')
|
||||
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
|
||||
self.mox.StubOutWithMock(timeutils, 'utcnow')
|
||||
self.mox.StubOutWithMock(self.compute, '_update_volume_usage_cache')
|
||||
self.stubs.Set(self.compute.driver, 'get_all_volume_usage',
|
||||
lambda x, y: [3, 4])
|
||||
# All the mocks are called
|
||||
utils.last_completed_audit_period().AndReturn((10, 20))
|
||||
self.compute._get_host_volume_bdms(ctxt, 'MockHost').AndReturn([1, 2])
|
||||
timeutils.utcnow().AndReturn(5)
|
||||
self.compute._update_volume_usage_cache(ctxt, [3, 4], 5)
|
||||
self.compute._update_volume_usage_cache(ctxt, [3, 4])
|
||||
self.mox.ReplayAll()
|
||||
CONF.volume_usage_poll_interval = 10
|
||||
self.compute._last_vol_usage_poll = 0
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
"""Tests for the conductor service."""
|
||||
|
||||
import datetime
|
||||
|
||||
import mox
|
||||
|
||||
from nova.api.ec2 import ec2utils
|
||||
@@ -376,35 +378,24 @@ class _BaseTestCase(object):
|
||||
def test_vol_usage_update(self):
|
||||
# the vol_usage_update method sends the volume usage notifications
|
||||
# as well as updating the database
|
||||
self.mox.StubOutWithMock(db, 'vol_usage_update')
|
||||
now = datetime.datetime(1, 1, 1)
|
||||
self.mox.StubOutWithMock(timeutils, 'utcnow')
|
||||
# nova.context
|
||||
timeutils.utcnow().AndReturn(0)
|
||||
# vol_usage_update 1
|
||||
timeutils.utcnow().AndReturn(now)
|
||||
# openstack.common.notifier
|
||||
timeutils.utcnow().AndReturn(now)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
inst = self._create_fake_instance({
|
||||
'project_id': 'fake-project_id',
|
||||
'user_id': 'fake-user_id',
|
||||
})
|
||||
fake_usage = {'tot_last_refreshed': 20,
|
||||
'curr_last_refreshed': 10,
|
||||
'volume_id': 'fake-vol',
|
||||
'instance_uuid': inst['uuid'],
|
||||
'project_id': 'fake-project_id',
|
||||
'user_id': 'fake-user_id',
|
||||
'availability_zone': 'fake-az',
|
||||
'tot_reads': 11,
|
||||
'curr_reads': 22,
|
||||
'tot_read_bytes': 33,
|
||||
'curr_read_bytes': 44,
|
||||
'tot_writes': 55,
|
||||
'curr_writes': 66,
|
||||
'tot_write_bytes': 77,
|
||||
'curr_write_bytes': 88}
|
||||
db.vol_usage_update(self.context, 'fake-vol', 'rd-req', 'rd-bytes',
|
||||
'wr-req', 'wr-bytes', inst['uuid'],
|
||||
'fake-project_id', 'fake-user_id', 'fake-az',
|
||||
'fake-refr', 'fake-bool', mox.IgnoreArg()).\
|
||||
AndReturn(fake_usage)
|
||||
self.mox.ReplayAll()
|
||||
self.conductor.vol_usage_update(self.context, 'fake-vol', 'rd-req',
|
||||
'rd-bytes', 'wr-req', 'wr-bytes',
|
||||
inst, 'fake-refr', 'fake-bool')
|
||||
|
||||
self.conductor.vol_usage_update(self.context, 'fake-vol',
|
||||
22, 33, 44, 55, inst,
|
||||
'2013-06-05T16:53:27.0', False)
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
@@ -412,11 +403,72 @@ class _BaseTestCase(object):
|
||||
self.assertEquals(payload['instance_id'], inst['uuid'])
|
||||
self.assertEquals(payload['user_id'], 'fake-user_id')
|
||||
self.assertEquals(payload['tenant_id'], 'fake-project_id')
|
||||
self.assertEquals(payload['reads'], 33)
|
||||
self.assertEquals(payload['read_bytes'], 77)
|
||||
self.assertEquals(payload['writes'], 121)
|
||||
self.assertEquals(payload['write_bytes'], 165)
|
||||
self.assertEquals(payload['reads'], 22)
|
||||
self.assertEquals(payload['read_bytes'], 33)
|
||||
self.assertEquals(payload['writes'], 44)
|
||||
self.assertEquals(payload['write_bytes'], 55)
|
||||
self.assertEquals(payload['availability_zone'], 'fake-az')
|
||||
self.assertEquals(payload['last_refreshed'], '0001-01-01 00:00:00')
|
||||
|
||||
# We need to unset and verify that we call the timutils.utcnow method
|
||||
# correctly now, as this method gets called as part of the setup
|
||||
# for the ConductorAPITestCase testcase.
|
||||
self.mox.UnsetStubs()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_vol_usage_update_again(self):
|
||||
# Test updating the volume usage a second time and make sure that
|
||||
# the database queries to update and generate the volume usage
|
||||
# event payload continue to work
|
||||
now = datetime.datetime(1, 1, 1, 0, 0, 0)
|
||||
self.mox.StubOutWithMock(timeutils, 'utcnow')
|
||||
# nova.context
|
||||
timeutils.utcnow().AndReturn(0)
|
||||
|
||||
# vol_usage_update call
|
||||
timeutils.utcnow().AndReturn(now)
|
||||
# openstack.common.notifier
|
||||
timeutils.utcnow().AndReturn(now)
|
||||
|
||||
now2 = datetime.datetime(1, 1, 1, 0, 1, 0)
|
||||
# vol_usage_update second call
|
||||
timeutils.utcnow().AndReturn(now2)
|
||||
# openstack.common.notifier
|
||||
timeutils.utcnow().AndReturn(now2)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
inst = self._create_fake_instance({
|
||||
'project_id': 'fake-project_id',
|
||||
'user_id': 'fake-user_id',
|
||||
})
|
||||
|
||||
self.conductor.vol_usage_update(self.context, 'fake-vol',
|
||||
22, 33, 44, 55, inst,
|
||||
'2013-06-05T16:53:27.0', False)
|
||||
|
||||
self.conductor.vol_usage_update(self.context, 'fake-vol',
|
||||
122, 133, 144, 155, inst,
|
||||
'2013-06-05T16:53:27.0', False)
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
msg = test_notifier.NOTIFICATIONS[1]
|
||||
payload = msg['payload']
|
||||
self.assertEquals(payload['instance_id'], inst['uuid'])
|
||||
self.assertEquals(payload['user_id'], 'fake-user_id')
|
||||
self.assertEquals(payload['tenant_id'], 'fake-project_id')
|
||||
self.assertEquals(payload['reads'], 122)
|
||||
self.assertEquals(payload['read_bytes'], 133)
|
||||
self.assertEquals(payload['writes'], 144)
|
||||
self.assertEquals(payload['write_bytes'], 155)
|
||||
self.assertEquals(payload['availability_zone'], 'fake-az')
|
||||
self.assertEquals(payload['last_refreshed'], '0001-01-01 00:01:00')
|
||||
|
||||
# We need to unset and verify that we call the timutils.utcnow method
|
||||
# correctly now, as this method gets called as part of the setup
|
||||
# for the ConductorAPITestCase testcase.
|
||||
self.mox.UnsetStubs()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_compute_node_create(self):
|
||||
self.mox.StubOutWithMock(db, 'compute_node_create')
|
||||
|
||||
@@ -3719,7 +3719,12 @@ class VolumeUsageDBApiTestCase(test.TestCase):
|
||||
ctxt = context.get_admin_context()
|
||||
now = timeutils.utcnow()
|
||||
start_time = now - datetime.timedelta(seconds=10)
|
||||
refreshed_time = now - datetime.timedelta(seconds=5)
|
||||
|
||||
self.mox.StubOutWithMock(timeutils, 'utcnow')
|
||||
timeutils.utcnow().AndReturn(now)
|
||||
timeutils.utcnow().AndReturn(now)
|
||||
timeutils.utcnow().AndReturn(now)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
expected_vol_usages = [{'volume_id': u'1',
|
||||
'instance_uuid': 'fake-instance-uuid1',
|
||||
@@ -3729,10 +3734,12 @@ class VolumeUsageDBApiTestCase(test.TestCase):
|
||||
'curr_read_bytes': 2000,
|
||||
'curr_writes': 3000,
|
||||
'curr_write_bytes': 4000,
|
||||
'curr_last_refreshed': now,
|
||||
'tot_reads': 0,
|
||||
'tot_read_bytes': 0,
|
||||
'tot_writes': 0,
|
||||
'tot_write_bytes': 0},
|
||||
'tot_write_bytes': 0,
|
||||
'tot_last_refreshed': None},
|
||||
{'volume_id': u'2',
|
||||
'instance_uuid': 'fake-instance-uuid2',
|
||||
'project_id': 'fake-project-uuid2',
|
||||
@@ -3744,7 +3751,8 @@ class VolumeUsageDBApiTestCase(test.TestCase):
|
||||
'tot_reads': 0,
|
||||
'tot_read_bytes': 0,
|
||||
'tot_writes': 0,
|
||||
'tot_write_bytes': 0}]
|
||||
'tot_write_bytes': 0,
|
||||
'tot_last_refreshed': None}]
|
||||
|
||||
def _compare(vol_usage, expected):
|
||||
for key, value in expected.items():
|
||||
@@ -3770,8 +3778,7 @@ class VolumeUsageDBApiTestCase(test.TestCase):
|
||||
instance_id='fake-instance-uuid1',
|
||||
project_id='fake-project-uuid1',
|
||||
user_id='fake-user-uuid1',
|
||||
availability_zone='fake-az',
|
||||
last_refreshed=refreshed_time)
|
||||
availability_zone='fake-az')
|
||||
|
||||
vol_usages = db.vol_get_usage_by_time(ctxt, start_time)
|
||||
self.assertEqual(len(vol_usages), 2)
|
||||
@@ -3780,9 +3787,19 @@ class VolumeUsageDBApiTestCase(test.TestCase):
|
||||
|
||||
def test_vol_usage_update_totals_update(self):
|
||||
ctxt = context.get_admin_context()
|
||||
now = timeutils.utcnow()
|
||||
now = datetime.datetime(1, 1, 1, 1, 0, 0)
|
||||
start_time = now - datetime.timedelta(seconds=10)
|
||||
|
||||
self.mox.StubOutWithMock(timeutils, 'utcnow')
|
||||
timeutils.utcnow().AndReturn(now)
|
||||
now1 = now + datetime.timedelta(minutes=1)
|
||||
timeutils.utcnow().AndReturn(now1)
|
||||
now2 = now + datetime.timedelta(minutes=2)
|
||||
timeutils.utcnow().AndReturn(now2)
|
||||
now3 = now + datetime.timedelta(minutes=3)
|
||||
timeutils.utcnow().AndReturn(now3)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
vol_usage = db.vol_usage_update(ctxt, 1, rd_req=100, rd_bytes=200,
|
||||
wr_req=300, wr_bytes=400,
|
||||
instance_id='fake-instance-uuid',
|
||||
@@ -3833,14 +3850,16 @@ class VolumeUsageDBApiTestCase(test.TestCase):
|
||||
'tot_read_bytes': 800,
|
||||
'tot_writes': 1000,
|
||||
'tot_write_bytes': 1200,
|
||||
'tot_last_refreshed': now3,
|
||||
'curr_reads': 0,
|
||||
'curr_read_bytes': 0,
|
||||
'curr_writes': 0,
|
||||
'curr_write_bytes': 0}
|
||||
'curr_write_bytes': 0,
|
||||
'curr_last_refreshed': now2}
|
||||
|
||||
self.assertEquals(1, len(vol_usages))
|
||||
for key, value in expected_vol_usages.items():
|
||||
self.assertEqual(vol_usages[0][key], value)
|
||||
self.assertEqual(vol_usages[0][key], value, key)
|
||||
|
||||
def test_vol_usage_update_when_blockdevicestats_reset(self):
|
||||
ctxt = context.get_admin_context()
|
||||
|
||||
Reference in New Issue
Block a user