Changes to billing code

JIRA: NCP-1966

* removed no longer necessary fields from messages
* replaced strings with constants
* added config options to enable/disable notifications

Change-Id: I34194b6e646c772d89e9dd1dae72016b0a04f068
Closes-Bug: 1594942
This commit is contained in:
Alexander Medvedev
2016-06-23 11:16:48 -05:00
parent 1206afe7a5
commit 51690906a7
7 changed files with 79 additions and 71 deletions

View File

@@ -44,6 +44,7 @@ NOTE: assumes that the beginning of a billing cycle is midnight.
import datetime
from neutron.common import rpc as n_rpc
from oslo_config import cfg
from oslo_log import log as logging
from sqlalchemy import and_, or_, null
from quark.db import models
@@ -51,17 +52,29 @@ from quark.db import models
from quark import network_strategy
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
quark_opts = [
cfg.StrOpt('notify_ip_add', default=False,
help=_("Sends notifications when IP is added")),
cfg.StrOpt('notify_ip_delete', default=False,
help=_("Sends notifications when IP is deleted")),
cfg.StrOpt('notify_flip_associate', default=False,
help=_("Sends notifications when FLIP is associated")),
cfg.StrOpt('notify_flip_disassociate', default=False,
help=_("Sends notifications when FLIP is disassociated")),
cfg.StrOpt('notify_ip_exists', default=False,
help=_("Sends 'ip.exists' notifications"))
]
CONF.register_opts(quark_opts, 'QUARK')
PUBLIC_NETWORK_ID = network_strategy.STRATEGY.get_public_net_id()
# NOTE: this will most likely go away to be done in yagi
EVENT_TYPE_2_CLOUDFEEDS = {
'ip.exists': 'USAGE',
'ip.add': 'CREATE',
'ip.delete': 'DELETE',
'ip.associate': 'UP',
'ip.disassociate': 'DOWN'
}
IP_ADD = 'ip.add'
IP_DEL = 'ip.delete'
IP_ASSOC = 'ip.associate'
IP_DISASSOC = 'ip.disassociate'
IP_EXISTS = 'ip.exists'
def do_notify(context, event_type, payload):
@@ -92,9 +105,17 @@ def notify(context, event_type, ipaddress, send_usage=False):
nothing
Notes: this may live in the billing module
"""
if (event_type == IP_ADD and not CONF.QUARK.notify_ip_add) or \
(event_type == IP_DEL and not CONF.QUARK.notify_ip_delete) or \
(event_type == IP_ASSOC and not CONF.QUARK.notify_flip_associate) or \
(event_type == IP_DISASSOC and not CONF.QUARK.notify_flip_disassociate)\
or (event_type == IP_EXISTS and not CONF.QUARK.notify_ip_exists):
LOG.debug('IP_BILL: notification {} is disabled by config'.
format(event_type))
# ip.add needs the allocated_at time.
# All other events need the current time.
ts = ipaddress.allocated_at if event_type == 'ip.add' else _now()
ts = ipaddress.allocated_at if event_type == IP_ADD else _now()
payload = build_payload(ipaddress, event_type, event_time=ts)
# Send the notification with the payload
@@ -115,10 +136,10 @@ def notify(context, event_type, ipaddress, send_usage=False):
else:
start_time = _midnight_today()
payload = build_payload(ipaddress,
'ip.exists',
IP_EXISTS,
start_time=start_time,
end_time=ts)
do_notify(context, 'ip.exists', payload)
do_notify(context, IP_EXISTS, payload)
def build_payload(ipaddress,
@@ -144,19 +165,16 @@ def build_payload(ipaddress,
"""
# This is the common part of all message types
payload = {
'event_type': unicode(EVENT_TYPE_2_CLOUDFEEDS[event_type]),
'event_type': unicode(event_type),
'tenant_id': unicode(ipaddress.used_by_tenant_id),
'ip_address': unicode(ipaddress.address_readable),
'subnet_id': unicode(ipaddress.subnet_id),
'network_id': unicode(ipaddress.network_id),
'public': True if ipaddress.network_id == PUBLIC_NETWORK_ID else False,
'ip_version': int(ipaddress.version),
'ip_type': unicode(ipaddress.address_type),
'id': unicode(ipaddress.id)
}
# Depending on the message type add the appropriate fields
if event_type == 'ip.exists':
if event_type == IP_EXISTS:
if start_time is None or end_time is None:
raise ValueError('IP_BILL: {} start_time/end_time cannot be empty'
.format(event_type))
@@ -164,31 +182,17 @@ def build_payload(ipaddress,
'startTime': unicode(convert_timestamp(start_time)),
'endTime': unicode(convert_timestamp(end_time))
})
elif event_type == 'ip.add':
elif event_type in [IP_ADD, IP_DEL, IP_ASSOC, IP_DISASSOC]:
if event_time is None:
raise ValueError('IP_BILL: {}: event_time cannot be NULL'
.format(event_type))
payload.update({
'eventTime': unicode(convert_timestamp(event_time)),
'subnet_id': unicode(ipaddress.subnet_id),
'network_id': unicode(ipaddress.network_id),
'public': True if ipaddress.network_id == PUBLIC_NETWORK_ID
else False,
})
elif event_type == 'ip.delete':
if event_time is None:
raise ValueError('IP_BILL: {}: event_time cannot be NULL'
.format(event_type))
payload.update({
'eventTime': unicode(convert_timestamp(event_time))
})
elif event_type == 'ip.associate' or event_type == 'ip.disassociate':
if event_time is None:
raise ValueError('IP_BILL: {}: event_time cannot be NULL'
.format(event_type))
# only pass floating ip addresses through this
if ipaddress.address_type not in ['floating', 'scaling']:
raise ValueError('IP_BILL: {} only valid for floating IPs'.
format(event_type),
' got {} instead'.format(ipaddress.address_type))
payload.update({'eventTime': unicode(convert_timestamp(event_time))})
else:
raise ValueError('IP_BILL: bad event_type: {}'.format(event_type))

View File

@@ -32,6 +32,9 @@ from oslo_db import exception as db_exception
from oslo_log import log as logging
from oslo_utils import timeutils
from quark.billing import IP_ADD
from quark.billing import IP_DEL
from quark.billing import IP_DISASSOC
from quark.billing import notify
from quark.db import api as db_api
from quark.db import ip_types
@@ -558,7 +561,7 @@ class QuarkIpam(object):
ip_types.FIXED))
# alexm: need to notify from here because this code
# does not go through the _allocate_from_subnet() path.
notify(context, 'ip.add', address)
notify(context, IP_ADD, address)
return address
except db_exception.DBDuplicateEntry:
# This shouldn't ever happen, since we hold a unique MAC
@@ -701,7 +704,7 @@ class QuarkIpam(object):
if self.is_strategy_satisfied(new_addresses, allocate_complete=True):
# Only notify when all went well
for address in new_addresses:
notify(context, 'ip.add', address)
notify(context, IP_ADD, address)
LOG.info("IPAM for port ID {0} completed with addresses "
"{1}".format(port_id,
[a["address_readable"]
@@ -718,7 +721,7 @@ class QuarkIpam(object):
address["deallocated"] = 1
address["address_type"] = None
notify(context, 'ip.delete', address, send_usage=True)
notify(context, IP_DEL, address, send_usage=True)
def deallocate_ips_by_port(self, context, port=None, **kwargs):
ips_to_remove = []
@@ -768,7 +771,7 @@ class QuarkIpam(object):
# SQLAlchemy caching.
context.session.add(flip)
context.session.flush()
notify(context, 'ip.disassociate', flip)
notify(context, IP_DISASSOC, flip)
driver = registry.DRIVER_REGISTRY.get_driver()
driver.remove_floating_ip(flip)
elif len(flip.fixed_ips) > 1:
@@ -782,7 +785,7 @@ class QuarkIpam(object):
context, flip, fix_ip)
context.session.add(flip)
context.session.flush()
notify(context, 'ip.disassociate', flip)
notify(context, IP_DISASSOC, flip)
else:
remaining_fixed_ips.append(fix_ip)
port_fixed_ips = {}

View File

@@ -166,7 +166,7 @@ def _create_flip(context, flip, port_fixed_ips):
raise
# alexm: Notify from this method for consistency with _delete_flip
billing.notify(context, 'ip.associate', flip)
billing.notify(context, billing.IP_ASSOC, flip)
def _get_flip_fixed_ip_by_port_id(flip, port_id):
@@ -188,8 +188,8 @@ def _update_flip(context, flip_id, ip_type, requested_ports):
# This list will hold flips that require notifications.
# Using sets to avoid dups, if any.
notifications = {
'ip.associate': set(),
'ip.disassociate': set()
billing.IP_ASSOC: set(),
billing.IP_DISASSOC: set()
}
context.session.begin()
@@ -233,7 +233,7 @@ def _update_flip(context, flip_id, ip_type, requested_ports):
for port_id in removed_port_ids:
port = db_api.port_find(context, id=port_id, scope=db_api.ONE)
flip = db_api.port_disassociate_ip(context, [port], flip)
notifications['ip.disassociate'].add(flip)
notifications[billing.IP_DISASSOC].add(flip)
fixed_ip = _get_flip_fixed_ip_by_port_id(flip, port_id)
if fixed_ip:
flip = db_api.floating_ip_disassociate_fixed_ip(
@@ -257,7 +257,7 @@ def _update_flip(context, flip_id, ip_type, requested_ports):
raise q_exc.NoAvailableFixedIpsForPort(port_id=port_id)
port_fixed_ips[port_id] = {'port': port, 'fixed_ip': fixed_ip}
flip = db_api.port_associate_ip(context, [port], flip, [port_id])
notifications['ip.associate'].add(flip)
notifications[billing.IP_ASSOC].add(flip)
flip = db_api.floating_ip_associate_fixed_ip(context, flip,
fixed_ip)
@@ -330,7 +330,7 @@ def _delete_flip(context, id, address_type):
# alexm: Notify from this method because we don't have the flip object
# in the callers
billing.notify(context, 'ip.disassociate', flip)
billing.notify(context, billing.IP_DISASSOC, flip)
def create_floatingip(context, content):

View File

@@ -20,6 +20,8 @@ import mock
import netaddr
from neutron.common import exceptions as ex
from quark.billing import IP_ASSOC
from quark.billing import IP_DISASSOC
from quark.db import models
from quark import exceptions as q_ex
from quark.plugin_modules import floating_ips
@@ -595,7 +597,7 @@ class TestUpdateFloatingIPs(test_quark_plugin.TestQuarkPlugin):
dict(floatingip=content))
self.assertEqual(ret["fixed_ip_address"], "192.168.0.1")
self.assertEqual(ret["port_id"], new_port["id"])
notify.assert_called_once_with(self.context, 'ip.associate',
notify.assert_called_once_with(self.context, IP_ASSOC,
mock.ANY)
def test_update_with_new_port(self):
@@ -621,8 +623,8 @@ class TestUpdateFloatingIPs(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(ret["fixed_ip_address"], "192.168.0.1")
self.assertEqual(ret["port_id"], new_port["id"])
self.assertEqual(notify.call_count, 2, 'Should notify twice here')
call_list = [mock.call(self.context, 'ip.disassociate', mock.ANY),
mock.call(self.context, 'ip.associate', mock.ANY)]
call_list = [mock.call(self.context, IP_DISASSOC, mock.ANY),
mock.call(self.context, IP_ASSOC, mock.ANY)]
notify.assert_has_calls(call_list, any_order=True)
def test_update_with_no_port(self):
@@ -637,7 +639,7 @@ class TestUpdateFloatingIPs(test_quark_plugin.TestQuarkPlugin):
dict(floatingip=content))
self.assertEqual(ret.get("fixed_ip_address"), None)
self.assertEqual(ret.get("port_id"), None)
notify.assert_called_once_with(self.context, 'ip.disassociate',
notify.assert_called_once_with(self.context, IP_DISASSOC,
mock.ANY)
def test_update_with_non_existent_port_should_fail(self):

View File

@@ -84,21 +84,15 @@ class QuarkBillingPayloadTest(QuarkBillingBaseTest):
ipaddress.allocated_at = start_time
ipaddress.deallocated_at = end_time
ipaddress.address_type = 'fixed'
payload = billing.build_payload(ipaddress, 'ip.exists',
payload = billing.build_payload(ipaddress, billing.IP_EXISTS,
start_time=start_time,
end_time=end_time)
self.assertEqual(payload['event_type'], u'USAGE',
self.assertEqual(payload['event_type'], billing.IP_EXISTS,
'event_type is wrong')
self.assertEqual(payload['tenant_id'], TENANT_ID,
'tenant_id is wrong')
self.assertEqual(payload['ip_address'], IP_READABLE,
'ip_address is wrong')
self.assertEqual(payload['subnet_id'], SUBNET_ID,
'subnet_id is wrong')
self.assertEqual(payload['network_id'], PUB_NETWORK_ID,
'network_id is wrong')
self.assertEqual(payload['public'], True,
'public should be true')
self.assertEqual(payload['ip_version'], 4,
'ip_version should be 4')
self.assertEqual(payload['ip_type'], 'fixed',
@@ -118,9 +112,10 @@ class QuarkBillingPayloadTest(QuarkBillingBaseTest):
ipaddress.allocated_at = event_time
ipaddress.deallocated_at = event_time
ipaddress.address_type = 'floating'
payload = billing.build_payload(ipaddress, 'ip.associate',
payload = billing.build_payload(ipaddress, billing.IP_ASSOC,
event_time=event_time)
self.assertEqual(payload['event_type'], u'UP', 'event_type is wrong')
self.assertEqual(payload['event_type'], billing.IP_ASSOC,
'event_type is wrong')
self.assertEqual(payload['tenant_id'], TENANT_ID, 'tenant_id is wrong')
self.assertEqual(payload['ip_address'], IP_READABLE,
'ip_address is wrong')
@@ -143,9 +138,10 @@ class QuarkBillingPayloadTest(QuarkBillingBaseTest):
ipaddress.allocated_at = event_time
ipaddress.deallocated_at = event_time
ipaddress.address_type = 'floating'
payload = billing.build_payload(ipaddress, 'ip.disassociate',
payload = billing.build_payload(ipaddress, billing.IP_DISASSOC,
event_time=event_time)
self.assertEqual(payload['event_type'], u'DOWN', 'event_type is wrong')
self.assertEqual(payload['event_type'], billing.IP_DISASSOC,
'event_type is wrong')
self.assertEqual(payload['tenant_id'], TENANT_ID, 'tenant_id is wrong')
self.assertEqual(payload['ip_address'], IP_READABLE,
'ip_address is wrong')

View File

@@ -26,6 +26,9 @@ from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_utils import timeutils
from quark.billing import IP_ADD
from quark.billing import IP_DEL
from quark.billing import IP_EXISTS
from quark.db import models
from quark import exceptions as q_exc
import quark.ipam
@@ -1870,7 +1873,7 @@ class QuarkIPAddressAllocationNotifications(QuarkIpamBaseTest):
notify.assert_called_once_with("network")
notify.return_value.info.assert_called_once_with(
self.context,
"ip.add",
IP_ADD,
mock.ANY)
def test_deallocation_notification(self):
@@ -1892,8 +1895,8 @@ class QuarkIPAddressAllocationNotifications(QuarkIpamBaseTest):
'Should have called notify twice')
# When we deallocate an IP we must send a usage message as well
# Verify that we called both methods. Order matters.
call_list = [mock.call(self.context, 'ip.delete', mock.ANY),
mock.call(self.context, 'ip.exists', mock.ANY)]
call_list = [mock.call(self.context, IP_DEL, mock.ANY),
mock.call(self.context, IP_EXISTS, mock.ANY)]
notify.return_value.info.assert_has_calls(call_list,
any_order=False)

View File

@@ -84,27 +84,27 @@ def main(notify, hour, minute):
for ipaddress in full_day_ips:
click.echo('start: {}, end: {}'.format(period_start, period_end))
payload = billing.build_payload(ipaddress,
'ip.exists',
billing.IP_EXISTS,
start_time=period_start,
end_time=period_end)
billing.do_notify(context,
'ip.exists',
billing.IP_EXISTS,
payload)
# '==================== Part Day ============================='
for ipaddress in partial_day_ips:
click.echo('start: {}, end: {}'.format(period_start, period_end))
payload = billing.build_payload(ipaddress,
'ip.exists',
billing.IP_EXISTS,
start_time=ipaddress.allocated_at,
end_time=period_end)
billing.do_notify(context,
'ip.exists',
billing.IP_EXISTS,
payload)
else:
click.echo('Case 1 ({}):\n'.format(len(full_day_ips)))
for ipaddress in full_day_ips:
pp(billing.build_payload(ipaddress,
'ip.exists',
billing.IP_EXISTS,
start_time=period_start,
end_time=period_end))
@@ -113,7 +113,7 @@ def main(notify, hour, minute):
click.echo('Case 2 ({}):\n'.format(len(partial_day_ips)))
for ipaddress in partial_day_ips:
pp(billing.build_payload(ipaddress,
'ip.exists',
billing.IP_EXISTS,
start_time=ipaddress.allocated_at,
end_time=period_end))