Add tenant/ user id to volume usage notifications

Volume usage notifications are generated to enable traffic based billing
on volumes. Include tenant id and user id to make these notifications
more useful to billing systems.

Fixes bug # 1158292

Change-Id: Ic71c10f0fc5d9e8c5a0e2f538de072e7ccca20ee
This commit is contained in:
Ollie Leahy 2013-03-21 13:16:05 +00:00
parent 86f303f511
commit a993b2b969
10 changed files with 267 additions and 20 deletions

View File

@ -290,7 +290,9 @@ def usage_volume_info(vol_usage):
usage_info = dict(
volume_id=vol_usage['volume_id'],
instance_id=vol_usage['instance_id'],
tenant_id=vol_usage['project_id'],
user_id=vol_usage['user_id'],
instance_id=vol_usage['instance_uuid'],
last_refreshed=null_safe_str(last_refreshed_time),
reads=vol_usage['tot_reads'] + vol_usage['curr_reads'],
read_bytes=vol_usage['tot_read_bytes'] +

View File

@ -302,8 +302,9 @@ class ConductorManager(manager.Manager):
wr_bytes, instance, last_refreshed=None,
update_totals=False):
self.db.vol_usage_update(context, vol_id, rd_req, rd_bytes, wr_req,
wr_bytes, instance['uuid'], last_refreshed,
update_totals)
wr_bytes, instance['uuid'],
instance['project_id'], instance['user_id'],
last_refreshed, update_totals)
@rpc_common.client_exceptions(exception.ComputeHostNotFound,
exception.HostBinaryNotFound)

View File

@ -1466,11 +1466,12 @@ def vol_get_usage_by_time(context, begin):
def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
instance_id, last_refreshed=None, update_totals=False):
instance_id, project_id, user_id,
last_refreshed=None, update_totals=False):
"""Update cached volume usage for a volume
Creates new record if needed."""
return IMPL.vol_usage_update(context, id, rd_req, rd_bytes, wr_req,
wr_bytes, instance_id,
wr_bytes, instance_id, project_id, user_id,
last_refreshed=last_refreshed,
update_totals=update_totals)

View File

@ -4188,8 +4188,8 @@ def vol_get_usage_by_time(context, begin):
@require_context
def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
instance_id, last_refreshed=None, update_totals=False,
session=None):
instance_id, project_id, user_id, last_refreshed=None,
update_totals=False, session=None):
if not session:
session = get_session()
@ -4206,7 +4206,9 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
'curr_read_bytes': rd_bytes,
'curr_writes': wr_req,
'curr_write_bytes': wr_bytes,
'instance_id': instance_id}
'instance_uuid': instance_id,
'project_id': project_id,
'user_id': user_id}
else:
values = {'tot_last_refreshed': last_refreshed,
'tot_reads': models.VolumeUsage.tot_reads + rd_req,
@ -4219,7 +4221,9 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
'curr_read_bytes': 0,
'curr_writes': 0,
'curr_write_bytes': 0,
'instance_id': instance_id}
'instance_uuid': instance_id,
'project_id': project_id,
'user_id': user_id}
rows = model_query(context, models.VolumeUsage,
session=session, read_deleted="yes").\
@ -4233,6 +4237,9 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
vol_usage.tot_last_refreshed = timeutils.utcnow()
vol_usage.curr_last_refreshed = timeutils.utcnow()
vol_usage.volume_id = id
vol_usage.instance_uuid = instance_id
vol_usage.project_id = project_id
vol_usage.user_id = user_id
if not update_totals:
vol_usage.curr_reads = rd_req

View File

@ -0,0 +1,53 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import MetaData, Integer, String, Table, Column
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# Allow the instance_id to be stored as a uuid, add columns for project_id
# and tenant_id.
volume_usage_cache = Table('volume_usage_cache', meta, autoload=True)
volume_usage_cache.drop_column('instance_id')
instance_id = Column('instance_uuid', String(36))
project_id = Column('project_id', String(36))
user_id = Column('user_id', String(36))
volume_usage_cache.create_column(instance_id)
volume_usage_cache.create_column(project_id)
volume_usage_cache.create_column(user_id)
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
volume_usage_cache = Table('volume_usage_cache', meta, autoload=True)
volume_usage_cache.drop_column('instance_uuid')
volume_usage_cache.drop_column('user_id')
volume_usage_cache.drop_column('project_id')
instance_id = Column('instance_id', Integer)
volume_usage_cache.create_column(instance_id)

View File

@ -888,7 +888,9 @@ class VolumeUsage(BASE, NovaBase):
__tablename__ = 'volume_usage_cache'
id = Column(Integer, primary_key=True, nullable=False)
volume_id = Column(String(36), nullable=False)
instance_id = Column(Integer)
instance_uuid = Column(String(36))
project_id = Column(String(36))
user_id = Column(String(36))
tot_last_refreshed = Column(DateTime)
tot_reads = Column(BigInteger, default=0)
tot_read_bytes = Column(BigInteger, default=0)

View File

@ -332,6 +332,120 @@ class ComputeVolumeTestCase(BaseTestCase):
block_device_mapping)
self.assertEqual(self.cinfo.get('serial'), self.volume_id)
def test_poll_volume_usage_disabled(self):
ctxt = 'MockContext'
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
self.mox.StubOutWithMock(utils, 'last_completed_audit_period')
# None of the mocks should be called.
self.mox.ReplayAll()
CONF.volume_usage_poll_interval = 0
self.compute._poll_volume_usage(ctxt)
self.mox.UnsetStubs()
def test_poll_volume_usage_interval_not_elapsed(self):
ctxt = 'MockContext'
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
self.mox.StubOutWithMock(utils, 'last_completed_audit_period')
self.mox.StubOutWithMock(self.compute.driver, 'get_all_volume_usage')
self.mox.StubOutWithMock(self.compute,
'_send_volume_usage_notifications')
self.mox.StubOutWithMock(time, 'time')
# Following methods will be called.
utils.last_completed_audit_period().AndReturn((0, 0))
time.time().AndReturn(10)
self.mox.ReplayAll()
CONF.volume_usage_poll_interval = 2
self.compute._last_vol_usage_poll = 9
self.compute._poll_volume_usage(ctxt)
self.mox.UnsetStubs()
def test_poll_volume_usage_returns_no_vols(self):
ctxt = 'MockContext'
self.compute.host = 'MockHost'
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
self.mox.StubOutWithMock(utils, 'last_completed_audit_period')
self.mox.StubOutWithMock(self.compute.driver, 'get_all_volume_usage')
self.mox.StubOutWithMock(self.compute,
'_send_volume_usage_notifications')
# Following methods are called.
utils.last_completed_audit_period().AndReturn((0, 0))
self.compute._get_host_volume_bdms(ctxt, 'MockHost').AndReturn([])
self.mox.ReplayAll()
CONF.volume_usage_poll_interval = 10
self.compute._last_vol_usage_poll = 0
self.compute._poll_volume_usage(ctxt)
self.mox.UnsetStubs()
def test_poll_volume_usage_with_data(self):
ctxt = 'MockContext'
self.compute.host = 'MockHost'
curr_time = time.time()
self.mox.StubOutWithMock(utils, 'last_completed_audit_period')
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
self.mox.StubOutWithMock(timeutils, 'utcnow')
self.mox.StubOutWithMock(self.compute, '_update_volume_usage_cache')
self.mox.StubOutWithMock(self.compute,
'_send_volume_usage_notifications')
self.stubs.Set(self.compute.driver, 'get_all_volume_usage',
lambda x, y: [3, 4])
# All the mocks are called
utils.last_completed_audit_period().AndReturn((10, 20))
self.compute._get_host_volume_bdms(ctxt, 'MockHost').AndReturn([1, 2])
timeutils.utcnow().AndReturn(5)
self.compute._update_volume_usage_cache(ctxt, [3, 4], 5)
self.compute._send_volume_usage_notifications(ctxt, 20)
self.mox.ReplayAll()
CONF.volume_usage_poll_interval = 10
self.compute._last_vol_usage_poll = 0
self.compute._poll_volume_usage(ctxt)
self.assertTrue((curr_time < self.compute._last_vol_usage_poll),
"_last_vol_usage_poll was not properly updated <%s>" %
self.compute._last_vol_usage_poll)
self.mox.UnsetStubs()
def test_send_volume_usage_notifications(self):
ctxt = 'MockContext'
test_notifier.NOTIFICATIONS = []
self.compute.host = 'MockHost'
fake_usage = {'tot_last_refreshed': 20,
'curr_last_refreshed': 10,
'volume_id': 'fake_volume_id',
'instance_uuid': 'fake_instance_uuid',
'project_id': 'fake_project_id',
'user_id': 'fake_user_id',
'tot_reads': 11,
'curr_reads': 22,
'tot_read_bytes': 33,
'curr_read_bytes': 44,
'tot_writes': 55,
'curr_writes': 66,
'tot_write_bytes': 77,
'curr_write_bytes': 88}
self.stubs.Set(self.compute.conductor_api,
'vol_get_usage_by_time',
lambda x, y:
[db.sqlalchemy.models.VolumeUsage(**fake_usage)])
self.stubs.Set(self.compute.conductor_api,
'instance_get_all_by_filters',
lambda x, y: [{'project_id': 'fake_project_id',
'user_id': 'fake_user_id'}])
self.compute._send_volume_usage_notifications(ctxt, 20)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
msg = test_notifier.NOTIFICATIONS[0]
payload = msg['payload']
self.assertEquals(payload['instance_id'], 'fake_instance_uuid')
self.assertEquals(payload['user_id'], 'fake_user_id')
self.assertEquals(payload['tenant_id'], 'fake_project_id')
self.assertEquals(payload['reads'], 33)
self.assertEquals(payload['read_bytes'], 77)
self.assertEquals(payload['writes'], 121)
self.assertEquals(payload['write_bytes'], 165)
class ComputeTestCase(BaseTestCase):
def test_wrap_instance_fault(self):

View File

@ -399,13 +399,16 @@ class _BaseTestCase(object):
def test_vol_usage_update(self):
self.mox.StubOutWithMock(db, 'vol_usage_update')
db.vol_usage_update(self.context, 'fake-vol', 'rd-req', 'rd-bytes',
'wr-req', 'wr-bytes', 'fake-id', 'fake-refr',
'wr-req', 'wr-bytes', 'fake-id',
'fake-project_id', 'fake-user_id', 'fake-refr',
'fake-bool')
self.mox.ReplayAll()
self.conductor.vol_usage_update(self.context, 'fake-vol', 'rd-req',
'rd-bytes', 'wr-req', 'wr-bytes',
{'uuid': 'fake-id'}, 'fake-refr',
'fake-bool')
{'uuid': 'fake-id',
'project_id': 'fake-project_id',
'user_id': 'fake-user_id'},
'fake-refr', 'fake-bool')
def test_ping(self):
result = self.conductor.ping(self.context, 'foo')

View File

@ -2372,11 +2372,17 @@ class VolumeUsageDBApiTestCase(test.TestCase):
refreshed_time = now - datetime.timedelta(seconds=5)
expected_vol_usages = [{'volume_id': u'1',
'instance_uuid': 'fake-instance-uuid1',
'project_id': 'fake-project-uuid1',
'user_id': 'fake-user-uuid1',
'curr_reads': 1000,
'curr_read_bytes': 2000,
'curr_writes': 3000,
'curr_write_bytes': 4000},
{'volume_id': u'2',
'instance_uuid': 'fake-instance-uuid2',
'project_id': 'fake-project-uuid2',
'user_id': 'fake-user-uuid2',
'curr_reads': 100,
'curr_read_bytes': 200,
'curr_writes': 300,
@ -2390,13 +2396,20 @@ class VolumeUsageDBApiTestCase(test.TestCase):
self.assertEqual(len(vol_usages), 0)
vol_usage = db.vol_usage_update(ctxt, 1, rd_req=10, rd_bytes=20,
wr_req=30, wr_bytes=40, instance_id=1)
wr_req=30, wr_bytes=40,
instance_id='fake-instance-uuid1',
project_id='fake-project-uuid1',
user_id='fake-user-uuid1')
vol_usage = db.vol_usage_update(ctxt, 2, rd_req=100, rd_bytes=200,
wr_req=300, wr_bytes=400,
instance_id=1)
instance_id='fake-instance-uuid2',
project_id='fake-project-uuid2',
user_id='fake-user-uuid2')
vol_usage = db.vol_usage_update(ctxt, 1, rd_req=1000, rd_bytes=2000,
wr_req=3000, wr_bytes=4000,
instance_id=1,
instance_id='fake-instance-uuid1',
project_id='fake-project-uuid1',
user_id='fake-user-uuid1',
last_refreshed=refreshed_time)
vol_usages = db.vol_get_usage_by_time(ctxt, start_time)
@ -2411,6 +2424,9 @@ class VolumeUsageDBApiTestCase(test.TestCase):
timeutils.set_time_override(now)
start_time = now - datetime.timedelta(seconds=10)
expected_vol_usages = {'volume_id': u'1',
'project_id': 'fake-project-uuid',
'user_id': 'fake-user-uuid',
'instance_uuid': 'fake-instance-uuid',
'tot_reads': 600,
'tot_read_bytes': 800,
'tot_writes': 1000,
@ -2422,17 +2438,25 @@ class VolumeUsageDBApiTestCase(test.TestCase):
vol_usage = db.vol_usage_update(ctxt, 1, rd_req=100, rd_bytes=200,
wr_req=300, wr_bytes=400,
instance_id=1)
instance_id='fake-instance-uuid',
project_id='fake-project-uuid',
user_id='fake-user-uuid')
vol_usage = db.vol_usage_update(ctxt, 1, rd_req=200, rd_bytes=300,
wr_req=400, wr_bytes=500,
instance_id=1,
instance_id='fake-instance-uuid',
project_id='fake-project-uuid',
user_id='fake-user-uuid',
update_totals=True)
vol_usage = db.vol_usage_update(ctxt, 1, rd_req=300, rd_bytes=400,
wr_req=500, wr_bytes=600,
instance_id=1)
instance_id='fake-instance-uuid',
project_id='fake-project-uuid',
user_id='fake-user-uuid')
vol_usage = db.vol_usage_update(ctxt, 1, rd_req=400, rd_bytes=500,
wr_req=600, wr_bytes=700,
instance_id=1,
instance_id='fake-instance-uuid',
project_id='fake-project-uuid',
user_id='fake-user-uuid',
update_totals=True)
vol_usages = db.vol_get_usage_by_time(ctxt, start_time)

View File

@ -1229,6 +1229,46 @@ class TestNovaMigrations(BaseMigrationTestCase, CommonTestsMixIn):
it_projects.insert().execute,
dict(instance_type=31, project_id='pr1', deleted=0))
# migration 175, Modify volume_usage-cache, Drop column instance_id, add
# columns instance_uuid, project_id and user_id
def _pre_upgrade_175(self, engine):
volume_usage_cache = get_table(engine, 'volume_usage_cache')
fake_usage = {'volume_id': 'fake_volume_id',
'instance_id': 10,
'tot_last_refreshed': datetime.datetime.now(),
'tot_reads': 2,
'tot_read_bytes': 3,
'tot_writes': 4,
'tot_write_bytes': 5,
'curr_last_refreshed': datetime.datetime.now(),
'curr_reads': 6,
'curr_read_bytes': 7,
'curr_writes': 8,
'curr_write_bytes': 9}
volume_usage_cache.insert().execute(fake_usage)
def _check_175(self, engine, data):
volume_usage_cache = get_table(engine, 'volume_usage_cache')
# Get the record
rows = volume_usage_cache.select().execute().fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]['instance_uuid'], None)
self.assertEqual(rows[0]['project_id'], None)
self.assertEqual(rows[0]['user_id'], None)
self.assertFalse('instance_id' in rows[0])
def _post_downgrade_175(self, engine):
volume_usage_cache = get_table(engine, 'volume_usage_cache')
# Get the record
rows = volume_usage_cache.select().execute().fetchall()
self.assertEqual(len(rows), 1)
self.assertFalse('instance_uuid' in rows[0])
self.assertFalse('project_id' in rows[0])
self.assertFalse('user_id' in rows[0])
self.assertEqual(rows[0]['instance_id'], None)
class TestBaremetalMigrations(BaseMigrationTestCase, CommonTestsMixIn):
"""Test sqlalchemy-migrate migrations."""