From dd086ee5f9d0ec7e0c6eeae426d2959769def83c Mon Sep 17 00:00:00 2001 From: Marc Pilon Date: Thu, 14 Nov 2013 16:50:04 -0500 Subject: [PATCH] [ADMIN_API] API side of Mnb Changes These changes consist of most of the API side of the MnB work that is needed. This is still Work In Progress. It does not contain the MnB code for sending usage notifications. Change-Id: I25ca671f65272709480f9788fa6d671eabe06046 --- bin/mnbtest.py | 54 +++ etc/libra.cfg | 28 +- etc/mnb.cfg | 39 ++ libra/admin_api/__init__.py | 49 +++ libra/admin_api/app.py | 25 +- libra/admin_api/device_pool/manage_pool.py | 6 +- libra/admin_api/stats/billing_sched.py | 192 +++++++++ libra/admin_api/stats/offline_sched.py | 161 ++++++++ .../stats/{scheduler.py => ping_sched.py} | 136 +------ libra/admin_api/stats/stats_gearman.py | 61 ++- libra/admin_api/stats/stats_sched.py | 217 +++++++++++ libra/api/controllers/load_balancers.py | 7 + libra/common/api/gearman_client.py | 3 +- libra/common/api/lbaas.py | 24 +- libra/common/api/lbaas.sql | 23 +- libra/common/api/mnb.py | 367 ++++++++++++++++++ requirements.txt | 1 + 17 files changed, 1248 insertions(+), 145 deletions(-) create mode 100755 bin/mnbtest.py create mode 100644 etc/mnb.cfg create mode 100644 libra/admin_api/stats/billing_sched.py create mode 100644 libra/admin_api/stats/offline_sched.py rename libra/admin_api/stats/{scheduler.py => ping_sched.py} (66%) create mode 100644 libra/admin_api/stats/stats_sched.py create mode 100644 libra/common/api/mnb.py diff --git a/bin/mnbtest.py b/bin/mnbtest.py new file mode 100755 index 00000000..cc67fbf8 --- /dev/null +++ b/bin/mnbtest.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +############################################################################## +# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################## +import logging as std_logging +import time + +from oslo.config import cfg +from libra.openstack.common import log as logging +from libra.common.api.mnb import update_mnb +from libra import __version__ + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + +CONF.register_opts([ + cfg.IntOpt('testcount', + metavar='COUNT', + default=1, + help='Number of messages to send') +]) + + +def main(): + CONF(project='mnbtest', version=__version__) + logging.setup('mnbtest') + LOG.debug('Configuration:') + + print "Starting Test" + print "LOG FILE = {0}".format(CONF.log_file) + LOG.info('STARTING MNBTEST') + CONF.log_opt_values(LOG, std_logging.DEBUG) + + LOG.info("Calling update_mnb with {0} messages".format(CONF.testcount)) + update_mnb('lbaas.instance.test', CONF.testcount, 456) + + time.sleep(30) + +if __name__ == "__main__": + main() diff --git a/etc/libra.cfg b/etc/libra.cfg index 72e03a7b..26faa3ba 100644 --- a/etc/libra.cfg +++ b/etc/libra.cfg @@ -24,6 +24,28 @@ #syslog_faciltiy = local7 #logstash = HOST:PORT +# Openstack +#notification_driver = openstack.common.notifier.rpc_notifier +#default_notification_level = INFO +#default_publisher_id = id +#host = localhost + +# Kombu +rabbit_use_ssl = False +#kombu_ssl_version = '' +#kombu_ssl_keyfile = '' +#kombu_ssl_certfile = '' +#kombu_ssl_ca_certs = '' +#rabbit_host = localhost +#rabbit_port = 5672 +#rabbit_userid = user +#rabbit_password = password +#rabbit_hosts = +#rabbit_virtual_host = vhost +#rabbit_retry_interval = 1 +#rabbit_retry_backoff = 2 +#rabbit_max_retries = 0 +#rabbit_ha_queues = False #----------------------------------------------------------------------- # Options for utilities that are Gearman workers or clients. @@ -110,6 +132,10 @@ nova_tenant_id = TENANTID #stats_poll_timeout = 5 #stats_poll_timeout_retry = 30 #vip_pool_size = 10 +#billing_enable = False +#exists_freq = 60 +#usage_freq = 60 +#stats_freq = 5 # Required options db_sections = mysql1 @@ -125,7 +151,6 @@ datadog_tags = service:lbaas # Others - #----------------------------------------------------------------------- # The [api] section is specific to the libra_api utility. #----------------------------------------------------------------------- @@ -149,7 +174,6 @@ ssl_certfile = certfile.crt ssl_keyfile = keyfile.key ip_filters = 192.168.0.0/24 - #----------------------------------------------------------------------- # The [mysql*] sections are referenced by admin_api and api by the # db_sections values. diff --git a/etc/mnb.cfg b/etc/mnb.cfg new file mode 100644 index 00000000..5ac9305a --- /dev/null +++ b/etc/mnb.cfg @@ -0,0 +1,39 @@ +######################################################################## +# Config for oslo notifier +######################################################################## + +[DEFAULT] +# Options to enable more verbose output +verbose = true +debug = true +use_stderr = true +publish_errors = true +logfile = /tmp/libra.log + +# Openstack +notification_driver = drivername +default_notification_level = INFO +default_publisher_id = lbaas +host = apiTest + +# Kombu +rabbit_use_ssl = True +rabbit_host = localhost +rabbit_port = 5671 +rabbit_userid = user +rabbit_password = password +#rabbit_hosts = +rabbit_virtual_host = vhost +rabbit_retry_interval = 1 +rabbit_retry_backoff = 2 +rabbit_max_retries = 0 +rabbit_ha_queues = False +fake_rabbit = False +control_exchange = exchange +amqp_durable_queues = True + +[admin_api] +billing_enable = True +exists_freq = 20 +logfile = /tmp/libra_admin.log +db_sections = '' diff --git a/libra/admin_api/__init__.py b/libra/admin_api/__init__.py index 5086614f..dc400df0 100644 --- a/libra/admin_api/__init__.py +++ b/libra/admin_api/__init__.py @@ -86,6 +86,55 @@ cfg.CONF.register_opts( cfg.IntOpt('vip_pool_size', default=10, help='Number of hot spare vips to keep in the pool'), + cfg.BoolOpt('billing_enable', + default=False, + help='Enable / Disable billing notifications'), + cfg.BoolOpt('stats_enable', + default=False, + help='Enable / Disable usage statistics gathering'), + cfg.IntOpt('exists_freq', + metavar='MINUTES', + default=60, + help='Minutes between sending of billing exists messages'), + cfg.IntOpt('usage_freq', + metavar='MINUTES', + default=60, + help='Minutes between sending of billing usage messages'), + cfg.IntOpt('stats_freq', + metavar='MINUTES', + default=5, + help='Minutes between sending STATS requests to workers'), + cfg.BoolOpt('stats_purge_enable', + default=False, + help='Enable / Disable purging of usage statistics'), + cfg.IntOpt('stats_purge_days', + metavar='DAYS', + default=5, + help='Number of days to keep usage STATS before purging'), + cfg.IntOpt('delete_timer_seconds', + default=5, + help='Which second of each minute delete timer should run'), + cfg.IntOpt('ping_timer_seconds', + default=15, + help='Second of each minute ping timer should run'), + cfg.IntOpt('stats_timer_seconds', + default=20, + help='Second of each minute stats timer should run'), + cfg.IntOpt('usage_timer_seconds', + default=25, + help='Which second of each minute usage timer should run'), + cfg.IntOpt('probe_timer_seconds', + default=30, + help='Which second of each minute probe timer should run'), + cfg.IntOpt('offline_timer_seconds', + default=45, + help='Second of each minute offline timer should run'), + cfg.IntOpt('vips_timer_seconds', + default=50, + help='Which second of each minute vips timer should run'), + cfg.IntOpt('exists_timer_seconds', + default=55, + help='Second of each minute exists timer should run'), ], group=adminapi_group ) diff --git a/libra/admin_api/app.py b/libra/admin_api/app.py index 715db454..2cda1d40 100644 --- a/libra/admin_api/app.py +++ b/libra/admin_api/app.py @@ -29,7 +29,10 @@ from eventlet import wsgi from libra import __version__ from libra.common.api import server from libra.admin_api.stats.drivers.base import known_drivers -from libra.admin_api.stats.scheduler import Stats +from libra.admin_api.stats.ping_sched import PingStats +from libra.admin_api.stats.offline_sched import OfflineStats +from libra.admin_api.stats.billing_sched import BillingStats +from libra.admin_api.stats.stats_sched import UsageStats from libra.admin_api.device_pool.manage_pool import Pool from libra.admin_api.expunge.expunge import ExpungeScheduler from libra.admin_api import config as api_config @@ -97,13 +100,27 @@ class MaintThreads(object): self.run_threads() def run_threads(self): - stats = Stats(self.drivers) + pool = Pool() - expunge = ExpungeScheduler() - self.classes.append(stats) self.classes.append(pool) + + expunge = ExpungeScheduler() self.classes.append(expunge) + pings = PingStats(self.drivers) + self.classes.append(pings) + + offline = OfflineStats(self.drivers) + self.classes.append(offline) + + if CONF['admin_api'].stats_enable: + usage = UsageStats(self.drivers) + self.classes.append(usage) + + if CONF['admin_api'].billing_enable: + billing = BillingStats(self.drivers) + self.classes.append(billing) + def exit_handler(self, signum, frame): signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) diff --git a/libra/admin_api/device_pool/manage_pool.py b/libra/admin_api/device_pool/manage_pool.py index eac7c88a..2c456b3f 100644 --- a/libra/admin_api/device_pool/manage_pool.py +++ b/libra/admin_api/device_pool/manage_pool.py @@ -31,9 +31,9 @@ LOG = log.getLogger(__name__) class Pool(object): - DELETE_SECONDS = 05 - PROBE_SECONDS = 30 - VIPS_SECONDS = 50 + DELETE_SECONDS = cfg.CONF['admin_api'].delete_timer_seconds + PROBE_SECONDS = cfg.CONF['admin_api'].probe_timer_seconds + VIPS_SECONDS = cfg.CONF['admin_api'].vips_timer_seconds def __init__(self): self.probe_timer = None diff --git a/libra/admin_api/stats/billing_sched.py b/libra/admin_api/stats/billing_sched.py new file mode 100644 index 00000000..348bcbc6 --- /dev/null +++ b/libra/admin_api/stats/billing_sched.py @@ -0,0 +1,192 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import datetime + +from oslo.config import cfg +from libra.common.api.lbaas import Billing, db_session +from libra.common.api.mnb import update_mnb, test_mnb_connection +from libra.openstack.common import timeutils +from libra.openstack.common import log as logging +from sqlalchemy.sql import func + + +LOG = logging.getLogger(__name__) + + +class BillingStats(object): + + EXISTS_SECONDS = cfg.CONF['admin_api'].exists_timer_seconds + USAGE_SECONDS = cfg.CONF['admin_api'].usage_timer_seconds + + def __init__(self, drivers): + self.drivers = drivers + self.usage_timer = None + self.exists_timer = None + self.server_id = cfg.CONF['admin_api']['server_id'] + self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] + self.exists_freq = cfg.CONF['admin_api'].exists_freq + self.usage_freq = cfg.CONF['admin_api'].usage_freq + self.start_usage_sched() + self.start_exists_sched() + + def shutdown(self): + if self.usage_timer: + self.usage_timer.cancel() + if self.exists_timer: + self.exists_timer.cancel() + + def update_usage(self): + # Work out if it is our turn to run + minute = datetime.datetime.now().minute + if self.server_id != minute % self.number_of_servers: + self.start_usage_sched() + return + + # Send periodic usage notifications + try: + self._exec_usage() + except Exception: + LOG.exception('Uncaught exception during billing usage update') + + # Need to restart timer after every billing cycle + self.start_usage_sched() + + def update_exists(self): + # Work out if it is our turn to run + minute = datetime.datetime.now().minute + if self.server_id != minute % self.number_of_servers: + self.start_exists_sched() + return + + # Send periodic exists notifications + try: + self._exec_exists() + except Exception: + LOG.exception('Uncaught exception during billing exists update') + + # Need to restart timer after every billing cycle + self.start_exists_sched() + + def _exec_exists(self): + with db_session() as session: + # Check if it's time to send exists notifications + delta = datetime.timedelta(minutes=self.exists_freq) + exp = timeutils.utcnow() - delta + exp_time = exp.strftime('%Y-%m-%d %H:%M:%S') + + updated = session.query( + Billing.last_update + ).filter(Billing.name == "exists").\ + filter(Billing.last_update > exp_time).\ + first() + + if updated is not None: + # Not time yet + LOG.info('Not time to send exists notifications yet {0}'. + format(exp_time)) + session.rollback() + return + + # Check the connection before sending the notifications + if not test_mnb_connection(): + # Abort the exists notifications + LOG.info("Aborting exists notifications. Could not connect") + session.rollback() + return + + #Update the exists timestamp now + session.query(Billing).\ + filter(Billing.name == "exists").\ + update({"last_update": func.now()}, + synchronize_session='fetch') + session.commit() + + # Send the notifications + update_mnb('lbaas.instance.exists', None, None) + + def _exec_usage(self): + with db_session() as session: + # Next check if it's time to send bandwidth usage notifications + delta = datetime.timedelta(minutes=self.usage_freq) + exp = timeutils.utcnow() - delta + + start, = session.query( + Billing.last_update + ).filter(Billing.name == "usage").\ + first() + + if start and start > exp: + # Not time yet + LOG.info('Not time to send usage statistics yet {0}'. + format(exp)) + session.rollback() + return + + # Check the connection before sending the notifications + if not test_mnb_connection(): + # Abort the exists notifications + LOG.info("Aborting usage notifications. Could not connect") + session.rollback() + return + + # Calculate the stopping point by rounding backward to the nearest + # N minutes. i.e. if N = 60, this will round us back to HH:00:00, + # or if N = 15, it will round us back to HH:15:00, HH:30:00, + # HH:45:00, or HH:00:00, whichever is closest. + N = cfg.CONF['admin_api'].usage_freq + now = timeutils.utcnow() + stop = now - datetime.timedelta(minutes=now.minute % N, + seconds=now.second, + microseconds=now.microsecond) + + # Release the lock + session.query(Billing).\ + filter(Billing.name == "usage").\ + update({"last_update": stop}, + synchronize_session='fetch') + session.commit() + + # Send the usage notifications. Pass the timestamps to save + # queries. + update_mnb('lbaas.bandwidth.usage', start, stop) + + def start_usage_sched(self): + # Always try to hit the expected second mark for usage + seconds = datetime.datetime.now().second + if seconds < self.USAGE_SECONDS: + sleeptime = self.USAGE_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.USAGE_SECONDS) + + LOG.info('LB usage timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.usage_timer =\ + threading.Timer(sleeptime, self.update_usage, ()) + self.usage_timer.start() + + def start_exists_sched(self): + # Always try to hit the expected second mark for exists + seconds = datetime.datetime.now().second + if seconds < self.EXISTS_SECONDS: + sleeptime = self.EXISTS_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.EXISTS_SECONDS) + + LOG.info('LB exists timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.exists_timer =\ + threading.Timer(sleeptime, self.update_exists, ()) + self.exists_timer.start() diff --git a/libra/admin_api/stats/offline_sched.py b/libra/admin_api/stats/offline_sched.py new file mode 100644 index 00000000..065046ed --- /dev/null +++ b/libra/admin_api/stats/offline_sched.py @@ -0,0 +1,161 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading + +from datetime import datetime +from oslo.config import cfg + +from libra.common.api.lbaas import Device, db_session +from libra.admin_api.stats.stats_gearman import GearJobs +from libra.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class OfflineStats(object): + + OFFLINE_SECONDS = cfg.CONF['admin_api'].offline_timer_seconds + + def __init__(self, drivers): + self.drivers = drivers + self.offline_timer = None + self.ping_limit = cfg.CONF['admin_api']['stats_offline_ping_limit'] + self.error_limit = cfg.CONF['admin_api']['stats_device_error_limit'] + self.server_id = cfg.CONF['admin_api']['server_id'] + self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] + + self.start_offline_sched() + + def shutdown(self): + if self.offline_timer: + self.offline_timer.cancel() + + def check_offline_lbs(self): + # Work out if it is our turn to run + minute = datetime.now().minute + if self.server_id != minute % self.number_of_servers: + LOG.info('Not our turn to run OFFLINE check, sleeping') + self.start_offline_sched() + return + tested = 0 + failed = 0 + try: + tested, failed = self._exec_offline_check() + except Exception: + LOG.exception('Uncaught exception during OFFLINE check') + # Need to restart timer after every ping cycle + LOG.info( + '{tested} OFFLINE loadbalancers tested, {failed} failed' + .format(tested=tested, failed=failed) + ) + self.start_offline_sched() + + def _exec_offline_check(self): + tested = 0 + failed = 0 + node_list = [] + LOG.info('Running OFFLINE check') + with db_session() as session: + # Join to ensure device is in-use + devices = session.query( + Device.id, Device.name + ).filter(Device.status == 'OFFLINE').all() + + tested = len(devices) + if tested == 0: + LOG.info('No OFFLINE Load Balancers to check') + return (0, 0) + for lb in devices: + node_list.append(lb.name) + gearman = GearJobs() + failed_lbs = gearman.offline_check(node_list) + failed = len(failed_lbs) + if failed > self.error_limit: + LOG.error( + 'Too many simultaneous Load Balancer Failures.' + ' Aborting deletion attempt' + ) + return tested, failed + + if failed > 0: + self._send_delete(failed_lbs) + + # Clear the ping counts for all devices not in + # the failed list + succeeded = list(set(node_list) - set(failed_lbs)) + session.query(Device.name, Device.pingCount).\ + filter(Device.name.in_(succeeded)).\ + update({"pingCount": 0}, synchronize_session='fetch') + + session.commit() + + return tested, failed + + def _send_delete(self, failed_nodes): + with db_session() as session: + for lb in failed_nodes: + # Get the current ping count + data = session.query( + Device.id, Device.pingCount).\ + filter(Device.name == lb).first() + + if not data: + LOG.error( + 'Device {0} no longer exists'.format(data.id) + ) + continue + + if data.pingCount < self.ping_limit: + data.pingCount += 1 + LOG.error( + 'Offline Device {0} has failed {1} ping attempts'. + format(lb, data.pingCount) + ) + session.query(Device).\ + filter(Device.name == lb).\ + update({"pingCount": data.pingCount}, + synchronize_session='fetch') + session.flush() + continue + + message = ( + 'Load balancer {0} unreachable and marked for deletion'. + format(lb) + ) + for driver in self.drivers: + instance = driver() + LOG.info( + 'Sending delete request for {0} to {1}'.format( + lb, instance.__class__.__name__ + ) + ) + instance.send_delete(message, data.id) + session.commit() + + def start_offline_sched(self): + # Always try to hit the expected second mark for offline checks + seconds = datetime.now().second + if seconds < self.OFFLINE_SECONDS: + sleeptime = self.OFFLINE_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.OFFLINE_SECONDS) + + LOG.info('LB offline check timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.offline_timer = threading.Timer( + sleeptime, self.check_offline_lbs, () + ) + self.offline_timer.start() diff --git a/libra/admin_api/stats/scheduler.py b/libra/admin_api/stats/ping_sched.py similarity index 66% rename from libra/admin_api/stats/scheduler.py rename to libra/admin_api/stats/ping_sched.py index 34884e2c..5949257b 100644 --- a/libra/admin_api/stats/scheduler.py +++ b/libra/admin_api/stats/ping_sched.py @@ -16,29 +16,20 @@ import threading from datetime import datetime from oslo.config import cfg - from libra.common.api.lbaas import LoadBalancer, Device, Node, db_session -from libra.openstack.common import log +from libra.openstack.common import log as logging from libra.admin_api.stats.stats_gearman import GearJobs - -LOG = log.getLogger(__name__) +LOG = logging.getLogger(__name__) -class NodeNotFound(Exception): - pass +class PingStats(object): - -class Stats(object): - - PING_SECONDS = 15 - OFFLINE_SECONDS = 45 + PING_SECONDS = cfg.CONF['admin_api'].ping_timer_seconds def __init__(self, drivers): self.drivers = drivers self.ping_timer = None - self.offline_timer = None - self.ping_limit = cfg.CONF['admin_api']['stats_offline_ping_limit'] self.error_limit = cfg.CONF['admin_api']['stats_device_error_limit'] self.server_id = cfg.CONF['admin_api']['server_id'] self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] @@ -46,33 +37,10 @@ class Stats(object): LOG.info("Selected stats drivers: %s", self.stats_driver) self.start_ping_sched() - self.start_offline_sched() def shutdown(self): if self.ping_timer: self.ping_timer.cancel() - if self.offline_timer: - self.offline_timer.cancel() - - def check_offline_lbs(self): - # Work out if it is our turn to run - minute = datetime.now().minute - if self.server_id != minute % self.number_of_servers: - LOG.info('Not our turn to run OFFLINE check, sleeping') - self.start_offline_sched() - return - tested = 0 - failed = 0 - try: - tested, failed = self._exec_offline_check() - except Exception: - LOG.exception('Uncaught exception during OFFLINE check') - # Need to restart timer after every ping cycle - LOG.info( - '{tested} OFFLINE loadbalancers tested, {failed} failed' - .format(tested=tested, failed=failed) - ) - self.start_offline_sched() def ping_lbs(self): # Work out if it is our turn to run @@ -126,47 +94,6 @@ class Stats(object): return pings, failed - def _exec_offline_check(self): - tested = 0 - failed = 0 - node_list = [] - LOG.info('Running OFFLINE check') - with db_session() as session: - # Join to ensure device is in-use - devices = session.query( - Device.id, Device.name - ).filter(Device.status == 'OFFLINE').all() - - tested = len(devices) - if tested == 0: - LOG.info('No OFFLINE Load Balancers to check') - return (0, 0) - for lb in devices: - node_list.append(lb.name) - gearman = GearJobs() - failed_lbs = gearman.offline_check(node_list) - failed = len(failed_lbs) - if failed > self.error_limit: - LOG.error( - 'Too many simultaneous Load Balancer Failures.' - ' Aborting deletion attempt' - ) - return tested, failed - - if failed > 0: - self._send_delete(failed_lbs) - - # Clear the ping counts for all devices not in - # the failed list - succeeded = list(set(node_list) - set(failed_lbs)) - session.query(Device.name, Device.pingCount).\ - filter(Device.name.in_(succeeded)).\ - update({"pingCount": 0}, synchronize_session='fetch') - - session.commit() - - return tested, failed - def _send_fails(self, failed_lbs): with db_session() as session: for lb in failed_lbs: @@ -196,47 +123,6 @@ class Stats(object): instance.send_alert(message, data.id) session.commit() - def _send_delete(self, failed_nodes): - with db_session() as session: - for lb in failed_nodes: - # Get the current ping count - data = session.query( - Device.id, Device.pingCount).\ - filter(Device.name == lb).first() - - if not data: - LOG.error( - 'Device {0} no longer exists'.format(data.id) - ) - continue - - if data.pingCount < self.ping_limit: - data.pingCount += 1 - LOG.error( - 'Offline Device {0} has failed {1} ping attempts'. - format(lb, data.pingCount) - ) - session.query(Device).\ - filter(Device.name == lb).\ - update({"pingCount": data.pingCount}, - synchronize_session='fetch') - session.flush() - continue - - message = ( - 'Load balancer {0} unreachable and marked for deletion'. - format(lb) - ) - for driver in self.drivers: - instance = driver() - LOG.info( - 'Sending delete request for {0} to {1}'.format( - lb, instance.__class__.__name__ - ) - ) - instance.send_delete(message, data.id) - session.commit() - def _get_lb(self, lb, session): lb = session.query( LoadBalancer.tenantid, Device.floatingIpAddr, Device.id @@ -358,17 +244,3 @@ class Stats(object): LOG.info('LB ping check timer sleeping for %d seconds', sleeptime) self.ping_timer = threading.Timer(sleeptime, self.ping_lbs, ()) self.ping_timer.start() - - def start_offline_sched(self): - # Always try to hit the expected second mark for offline checks - seconds = datetime.now().second - if seconds < self.OFFLINE_SECONDS: - sleeptime = self.OFFLINE_SECONDS - seconds - else: - sleeptime = 60 - (seconds - self.OFFLINE_SECONDS) - - LOG.info('LB offline check timer sleeping for %d seconds', sleeptime) - self.offline_timer = threading.Timer( - sleeptime, self.check_offline_lbs, () - ) - self.offline_timer.start() diff --git a/libra/admin_api/stats/stats_gearman.py b/libra/admin_api/stats/stats_gearman.py index b8df85d5..342dd15c 100644 --- a/libra/admin_api/stats/stats_gearman.py +++ b/libra/admin_api/stats/stats_gearman.py @@ -47,7 +47,7 @@ class GearJobs(object): failed_list = [] node_status = dict() retry_list = [] - job_data = {"hpcs_action": "STATS"} + job_data = {"hpcs_action": "PING"} for node in node_list: list_of_jobs.append(dict(task=str(node), data=job_data)) submitted_pings = self.gm_client.submit_multiple_jobs( @@ -143,3 +143,62 @@ class GearJobs(object): if gearman_fail > max_fail_count: failed_list.append(ping.job.task) return failed_list + + def get_stats(self, node_list): + # TODO: lots of duplicated code that needs cleanup + list_of_jobs = [] + failed_list = [] + retry_list = [] + results = {} + job_data = {"hpcs_action": "STATS"} + for node in node_list: + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_stats = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=self.poll_timeout + ) + for stats in submitted_stats: + if stats.state == JOB_UNKNOWN: + # TODO: Gearman server failed, ignoring for now + retry_list.append(stats.job.task) + if stats.timed_out: + # Timeout + retry_list.append(stats.job.task) + if stats.result['hpcs_response'] == 'FAIL': + # Error returned by Gearman + failed_list.append(stats.job.task) + else: + #Success + results[stats.job.task] = stats.result + + list_of_jobs = [] + if len(retry_list) > 0: + LOG.info( + "{0} stats timed out, retrying".format(len(retry_list)) + ) + for node in retry_list: + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_stats = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=self.poll_retry + ) + for stats in submitted_stats: + if stats.state == JOB_UNKNOWN: + # TODO: Gearman server failed, ignoring for now + LOG.error( + "Gearman Job server failed during STATS check of {0}". + format(stats.job.task) + ) + failed_list.append(stats.job.task) + if stats.timed_out: + # Timeout + failed_list.append(stats.job.task) + if stats.result['hpcs_response'] == 'FAIL': + # Error returned by Gearman + failed_list.append(stats.job.task) + continue + else: + #Success + results[stats.job.task] = stats.result + + return failed_list, results diff --git a/libra/admin_api/stats/stats_sched.py b/libra/admin_api/stats/stats_sched.py new file mode 100644 index 00000000..cee08efe --- /dev/null +++ b/libra/admin_api/stats/stats_sched.py @@ -0,0 +1,217 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import datetime + +from oslo.config import cfg +from libra.common.api.lbaas import LoadBalancer, Device, db_session +from libra.common.api.lbaas import Billing, Stats +from libra.admin_api.stats.stats_gearman import GearJobs +from libra.openstack.common import timeutils +from libra.openstack.common import log as logging +from sqlalchemy.sql import func + + +LOG = logging.getLogger(__name__) + + +class UsageStats(object): + + STATS_SECONDS = cfg.CONF['admin_api'].stats_timer_seconds + + def __init__(self, drivers): + self.drivers = drivers + self.stats_timer = None + self.server_id = cfg.CONF['admin_api']['server_id'] + self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] + self.stats_freq = cfg.CONF['admin_api'].stats_freq + self.billing_enable = cfg.CONF['admin_api'].billing_enable + + self.start_stats_sched() + + def shutdown(self): + if self.billing_timer: + self.billing_timer.cancel() + + def gather_stats(self): + # Work out if it is our turn to run + minute = datetime.datetime.now().minute + if self.server_id != minute % self.number_of_servers: + self.start_stats_sched() + return + total = 0 + fail = 0 + try: + fail, total = self._exec_stats() + except Exception: + LOG.exception('Uncaught exception during stats collection') + + # Need to restart timer after every stats cycle + LOG.info('{total} lb device stats queried, {fail} failed' + .format(total=total, fail=fail)) + self.start_stats_sched() + + def _exec_stats(self): + failed = 0 + node_list = [] + with db_session() as session: + delta = datetime.timedelta(minutes=self.stats_freq) + exp = timeutils.utcnow() - delta + exp_time = exp.strftime('%Y-%m-%d %H:%M:%S') + + updated = session.query( + Billing.last_update + ).filter(Billing.name == "stats").\ + filter(Billing.last_update > exp_time).\ + first() + + if updated is not None: + # Not time yet + LOG.info('Not time to gather stats yet {0}'.format(exp_time)) + session.rollback() + return 0, 0 + + #Update the stats timestamp + session.query(Billing).\ + filter(Billing.name == "stats").\ + update({"last_update": func.now()}, + synchronize_session='fetch') + + # Get all the online devices to query for stats + devices = session.query( + Device.id, Device.name + ).filter(Device.status == 'ONLINE').all() + + if devices is None or len(devices) == 0: + LOG.error('No ONLINE devices to gather usage stats from') + session.rollback() + return 0, 0 + total = len(devices) + + for device in devices: + node_list.append(device.name) + gearman = GearJobs() + failed_list, results = gearman.get_stats(node_list) + failed = len(failed_list) + + if failed > 0: + self._send_fails(failed_list) + + if total > failed: + # We have some success + self._update_stats(results, failed_list) + session.commit() + else: + # Everything failed. Retry these on the next timer firing + session.rollback() + + return failed, total + + def _update_stats(self, results, failed_list): + with db_session() as session: + lbs = session.query( + LoadBalancer.id, + LoadBalancer.protocol, + LoadBalancer.status, + Device.name + ).join(LoadBalancer.devices).\ + filter(Device.status == 'ONLINE').all() + + if lbs is None: + session.rollback() + LOG.error('No Loadbalancers found when updating stats') + return + + total = len(lbs) + added = 0 + for lb in lbs: + if lb.name not in results: + if lb.name not in failed_list: + LOG.error( + 'No stats results found for Device {0}, LBID {1}' + .format(lb.name, lb.id)) + continue + + result = results[lb.name] + protocol = lb.protocol.lower() + if protocol != "http": + # GALERA or TCP = TCP at the worker + protocol = "tcp" + + bytes_out = -1 + for data in result["loadBalancers"]: + if data["protocol"] == protocol: + bytes_out = data["bytes_out"] + + if bytes_out == -1: + LOG.error( + 'No stats found for Device {0}, ' + 'LBID {1}, protocol {2}' + .format(lb.name, lb.id, protocol)) + continue + + new_entry = Stats() + new_entry.lbid = lb.id + new_entry.period_start = result["utc_start"] + new_entry.period_end = result["utc_end"] + new_entry.bytes_out = bytes_out + new_entry.status = lb.status + session.add(new_entry) + session.flush + added += 1 + session.commit() + LOG.info( + '{total} loadbalancers stats queried, {fail} failed' + .format(total=total, fail=total - added)) + + def _send_fails(self, failed_list): + with db_session() as session: + for device_name in failed_list: + data = self._get_lb(device_name, session) + if not data: + LOG.error( + 'Device {0} has no Loadbalancer attached during STATS'. + format(device_name) + ) + continue + + LOG.error( + 'Load balancer failed STATS request ' + 'ID: {0}\n' + 'IP: {1}\n' + 'tenant: {2}\n'.format( + data.id, data.floatingIpAddr, + data.tenantid)) + + def _get_lb(self, lb, session): + lb = session.query( + LoadBalancer.tenantid, Device.floatingIpAddr, Device.id + ).join(LoadBalancer.devices).\ + filter(Device.name == lb).first() + + return lb + + def start_stats_sched(self): + # Always try to hit the expected second mark for stats + seconds = datetime.datetime.now().second + if seconds < self.STATS_SECONDS: + sleeptime = self.STATS_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.STATS_SECONDS) + + LOG.info('LB stats timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.stats_timer = threading.Timer(sleeptime, self.gather_stats, ()) + self.stats_timer.start() diff --git a/libra/api/controllers/load_balancers.py b/libra/api/controllers/load_balancers.py index 92b01082..fb95ad81 100644 --- a/libra/api/controllers/load_balancers.py +++ b/libra/api/controllers/load_balancers.py @@ -33,6 +33,7 @@ from libra.common.exc import ExhaustedError from libra.api.model.validators import LBPut, LBPost, LBResp, LBVipResp from libra.api.model.validators import LBRespNode from libra.common.api.gearman_client import submit_job +from libra.common.api.mnb import update_mnb from libra.api.acl import get_limited_to_project from libra.api.library.exp import OverLimit, IPOutOfRange, NotFound from libra.api.library.exp import ImmutableEntity, ImmutableStates @@ -469,6 +470,9 @@ class LoadBalancersController(RestController): 'UPDATE', device.name, device.id, lb.id ) + #Notify billing of the LB creation + update_mnb('lbaas.instance.create', lb.id, tenant_id) + return return_data @wsme_pecan.wsexpose(None, body=LBPut, status_code=202) @@ -579,6 +583,9 @@ class LoadBalancersController(RestController): submit_job( 'DELETE', device.name, device.id, lb.id ) + + #Notify billing of the LB deletion + update_mnb('lbaas.instance.delete', lb.id, tenant_id) return None def usage(self, load_balancer_id): diff --git a/libra/common/api/gearman_client.py b/libra/common/api/gearman_client.py index 58518cbf..c4faaad7 100644 --- a/libra/common/api/gearman_client.py +++ b/libra/common/api/gearman_client.py @@ -33,7 +33,8 @@ gearman_workers = [ 'DELETE', # Delete a Load Balancer. 'DISCOVER', # Return service discovery information. 'ARCHIVE', # Archive LB log files. - 'STATS' # Get load balancer statistics. + 'STATS', # Get load balancer statistics. + 'PING' # Ping load balancers ] diff --git a/libra/common/api/lbaas.py b/libra/common/api/lbaas.py index 9ec1790c..0f2e4d27 100644 --- a/libra/common/api/lbaas.py +++ b/libra/common/api/lbaas.py @@ -20,7 +20,7 @@ import time from oslo.config import cfg from pecan import conf from sqlalchemy import Table, Column, Integer, ForeignKey, create_engine -from sqlalchemy import INTEGER, VARCHAR, BIGINT +from sqlalchemy import INTEGER, VARCHAR, BIGINT, DATETIME from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, backref, sessionmaker, Session @@ -151,6 +151,28 @@ class HealthMonitor(DeclarativeBase): path = Column(u'path', VARCHAR(length=2000)) +class Billing(DeclarativeBase): + __tablename__ = 'billing' + id = Column(u'id', Integer, primary_key=True, nullable=False) + name = Column(u'name', VARCHAR(length=128), nullable=False) + last_update = Column(u'last_update', DATETIME(), nullable=False) + + +class Stats(DeclarativeBase): + """stats model""" + __tablename__ = 'stats' + #column definitions + id = Column(u'id', BIGINT(), primary_key=True, nullable=False) + lbid = Column( + u'lbid', BIGINT(), ForeignKey('loadbalancers.id'), primary_key=True, + nullable=False + ) + period_start = Column(u'period_start', DATETIME(), nullable=False) + period_end = Column(u'period_end', DATETIME(), nullable=False) + bytes_out = Column(u'bytes_out', BIGINT(), nullable=False) + status = Column(u'status', VARCHAR(length=50), nullable=False) + + class RoutingSession(Session): """ Try to use the first engine provided. If this fails use the next in sequence and so on. Reset to the first after 60 seconds diff --git a/libra/common/api/lbaas.sql b/libra/common/api/lbaas.sql index 7c4987d3..3f49e371 100644 --- a/libra/common/api/lbaas.sql +++ b/libra/common/api/lbaas.sql @@ -72,7 +72,7 @@ CREATE TABLE monitors ( timeout INT NOT NULL, # Maximum number of seconds to wait for a connection to the node before it times out. attemptsBeforeDeactivation INT NOT NULL, # Number of permissible failures before removing a node from rotation. 1 to 10. path VARCHAR(2000) NULL, # The HTTP path used in the request by the monitor. Begins with / - PRIMARY KEY (lbid) # ids are unique accross all Nodes + PRIMARY KEY (lbid) # ids are unique across all Nodes ) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci; CREATE TABLE `pool_building` ( @@ -100,3 +100,24 @@ CREATE TABLE `global_limits` ( INSERT INTO `global_limits` VALUES (1,'maxLoadBalancerNameLength',128),(2,'maxVIPsPerLoadBalancer',1),(3,'maxNodesPerLoadBalancer',50),(4,'maxLoadBalancers',20); +# Billing +CREATE TABLE billing ( + id int(11) NOT NULL, + name varchar(128) NOT NULL, + last_update DATETIME NOT NULL DEFAULT '0000-00-00 00:00:00', # timestamp of when the feature was last updated + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET latin1; + +INSERT INTO billing VALUES (1, 'stats', '0000-00-00 00:00:00'),(2, 'usage', '0000-00-00 00:00:00'),(3, 'exists', '0000-00-00 00:00:00'); + +# Stats +CREATE TABLE stats ( + id BIGINT NOT NULL AUTO_INCREMENT, # unique id for this billing record + lbid BIGINT NOT NULL REFERENCES loadblancers(id), # fk for lbid + period_start DATETIME NOT NULL, # timestamp of when this period started + period_end DATETIME NOT NULL, # timestamp of when this period ended + bytes_out BIGINT NOT NULL, # bytes transferred in this period + status VARCHAR(50) NOT NULL, # Current LB status + PRIMARY KEY (id) # ids are unique across all LBs + ) ENGINE=InnoDB DEFAULT CHARSET latin1; + \ No newline at end of file diff --git a/libra/common/api/mnb.py b/libra/common/api/mnb.py new file mode 100644 index 00000000..d495d3f3 --- /dev/null +++ b/libra/common/api/mnb.py @@ -0,0 +1,367 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import eventlet +eventlet.monkey_patch() + +from oslo.config import cfg +from libra.common.api.lbaas import LoadBalancer, db_session +from libra.common.api.lbaas import Stats +from libra.openstack.common.notifier import api as notifier_api +from libra.openstack.common import timeutils +from libra.openstack.common import log as logging +from libra.openstack.common import rpc +from libra.openstack.common.rpc import common as rpc_common +from sqlalchemy.sql import func + + +LOG = logging.getLogger(__name__) + + +def update_mnb(event_type, lbid, tenant_id): + # Start a new thread + eventlet.spawn_n(client_job, event_type, lbid, tenant_id) + + +def client_job(event_type, lbid, tenant_id): + + try: + if(event_type == 'lbaas.instance.create' or + event_type == 'lbaas.instance.delete'): + _send_create_or_delete(event_type, lbid, tenant_id) + elif event_type == 'lbaas.instance.exists': + _send_exists(event_type) + elif event_type == 'lbaas.bandwidth.usage': + _send_usage(event_type, lbid, tenant_id) + elif event_type == 'lbaas.instance.test': + _send_test(event_type, lbid, tenant_id) + return + + except: + LOG.exception("MnB notify: unhandled exception") + + LOG.error("MnB notification unsuccessful. Type {0}, loadbalancer {1} " + "tenant_id {2}".format(event_type, lbid, tenant_id)) + + +def _notify(service, event_type, payload): + priority = cfg.CONF.default_notification_level + publisher_id = notifier_api.publisher_id(service) + notifier_api.notify(None, publisher_id, event_type, priority, payload) + + +def test_mnb_connection(): + # Because the oslo notifier code does not have a return status + # and exceptions are caught inside oslo (I know...), the best we + # can do here is use the oslo rpc code to try a test connection + # to the MnB servers before the notification(s) are sent. + connected = False + try: + cx = rpc.create_connection() + cx.close() + LOG.info("Verified RPC connection is ready") + connected = True + except rpc_common.RPCException as e: + LOG.error("RPC connect exception: %s", e) + except Exception as e: + LOG.error("Non-RPC connect exception: %s", e) + return connected + + +def _send_create_or_delete(event_type, lbid, tenant_id): + + LOG.info( + "Sending MnB {0} notification to MnB for " + "loadbalancer {1} tenant_id {2}".format( + event_type, lbid, tenant_id) + ) + + if not test_mnb_connection(): + # Abort the notification + if event_type == 'lbaas.instance.create': + LOG.info("Aborting Create Notifications. Could not connect") + else: + LOG.info("Aborting Delete Notifications. Could not connect") + return + + with db_session() as session: + lb = session.query( + LoadBalancer.name, + LoadBalancer.status, + LoadBalancer.created, + LoadBalancer.updated + ).filter(LoadBalancer.id == lbid).\ + filter(LoadBalancer.tenantid == tenant_id).first() + + if lb is None: + session.rollback() + LOG.error("Load Balancer {0} not found for tenant {1}".format( + lbid, tenant_id)) + return + + if event_type == 'lbaas.instance.create': + date = lb.created + else: + date = lb.updated + + # Build the payload + payload = _build_payload(date, date, lb.name, lbid, + tenant_id, lb.status) + + _notify('lbaas', event_type, payload) + session.commit() + + +def _send_exists(event_type): + + LOG.info("Sending MnB {0} notifications to MnB".format(event_type)) + count = 0 + with db_session() as session: + lbs = session.query( + LoadBalancer.id, + LoadBalancer.tenantid, + LoadBalancer.name, + LoadBalancer.status, + LoadBalancer.created, + LoadBalancer.updated + ).filter(LoadBalancer.status != 'DELETED').all() + + if not lbs: + session.rollback() + LOG.error("No existing Load Balancers found") + return + + # Figure out our audit period beging/ending + seconds = (cfg.CONF['admin_api'].exists_freq * 60) + interval = datetime.timedelta(seconds=seconds) + audit_period_ending = timeutils.utcnow() + audit_period_beginning = audit_period_ending - interval + audit_period_beginning = str(audit_period_beginning) + audit_period_ending = str(audit_period_ending) + + for lb in lbs: + LOG.info( + "Sending MnB {0} notification to MnB for " + "loadbalancer {1} tenant_id {2}".format( + event_type, lb.id, lb.tenantid) + ) + + # Build the payload + payload = _build_payload(audit_period_beginning, + audit_period_ending, + lb.name, lb.id, lb.tenantid, lb.status) + + _notify('lbaas', event_type, payload) + count += 1 + + session.commit() + LOG.info("Sent {0} MnB {1} notifications to MnB".format(count, event_type)) + + +def _send_usage(event_type, start, stop): + + LOG.info("Sending MnB {0} notifications to MnB".format(event_type)) + N = cfg.CONF['admin_api'].usage_freq + + with db_session() as session: + + # Start by making sure we have stats in the Stats table and + # track the oldest value in case we need it below. + oldest, = session.query(Stats.period_end).\ + order_by(Stats.id.asc()).first() + + if oldest is None: + # No Stats at all + LOG.info("No usage statistics to send.") + session.rollback() + return + + if start is None: + # The value in the DB must be '0000-00-00 00:00:00 so + # as a starting point, we can find the oldest stat in + # the Stats table and start from there. No sense iterating + # from 0000-00-00 to now looking for stats to send. Also + # round it back to the previous update period + start = _rounded_down_min(oldest, N) + LOG.info("Starting usage notifications from first saved {0}". + format(start)) + + # Now that we know where to start, make sure we have stats to + # send for the time period. Use stats that end in this period. + # It's ok if the stats started in a previous period. Some skew + # is allowed. + total = session.query(Stats).\ + filter(Stats.period_end >= start).\ + filter(Stats.period_end < stop).\ + count() + if total == 0: + LOG.info("No usage statistics to send between {0} and {1}" + .format(start, stop)) + session.rollback() + return + + LOG.info("Found {0} total usage statistics to send between {1} and {2}" + .format(total, start, stop)) + + # Get info on all of our loadbalancers for the payloads. + loadbalancers = _get_lbs() + + # Get ready to loop through however N minute periods we + # have to send. We do it this way rather than one lump sum + # because finer grain data is probably needed on the MnB side. + end = start + datetime.timedelta(minutes=N) + count = 0 + while end <= stop: + # Loop through all N periods up to the current period + # sending usage notifications to MnB + stats = session.query( + Stats.lbid, + func.sum(Stats.bytes_out) + ).group_by(Stats.lbid).\ + filter(Stats.period_end >= start).\ + filter(Stats.period_end < end).\ + all() + + # Prep for the next loop here in case of continue + prev_start = start + prev_end = end + start = end + end = start + datetime.timedelta(minutes=N) + + if not stats: + LOG.info("No usage statistics to send for period {0} to {1}". + format(prev_start, prev_end)) + continue + else: + LOG.info("Sending usage statistics for {0} to {1}". + format(prev_start, prev_end)) + + audit_period_beginning = str(prev_start) + audit_period_ending = str(prev_end) + for lb in stats: + lbid, byte_count = lb + + if lbid not in loadbalancers: + LOG.error("Loadbalancer {0} not found in DB " + "not sending usage statistics".format(lbid)) + continue + + # Build the payload + payload = _build_payload(audit_period_beginning, + audit_period_ending, + loadbalancers[lbid]["name"], + lbid, + loadbalancers[lbid]["tenant_id"], + loadbalancers[lbid]["status"]) + + payload["metrics"] = _build_metrics(byte_count) + + LOG.info( + "Sending MnB {0} notification to MnB for " + "loadbalancer {1} tenant_id {2} from " + "{3} to {4}: PAYLOAD = {5}". + format(event_type, + lbid, + loadbalancers[lbid]["tenant_id"], + prev_start, + prev_end, + payload) + ) + _notify('lbaas', event_type, payload) + count += 1 + + # Purge old stats + if cfg.CONF['admin_api'].stats_purge_enable: + hours = cfg.CONF['admin_api'].stats_purge_days * 24 + delta = datetime.timedelta(hours=hours) + exp = timeutils.utcnow() - delta + exp_time = exp.strftime('%Y-%m-%d %H:%M:%S') + purged = session.query(Stats).\ + filter(Stats.period_end < exp_time).\ + delete() + LOG.info("Purged {0} usage statistics from before {1}". + format(purged, exp_time)) + + session.commit() + LOG.info("Sent {0} MnB {1} notifications to MnB".format(count, event_type)) + + +def _send_test(event_type, lbid, tenant_id): + + # Build the payload + now = str(timeutils.utcnow()) + LOG.error("Sending {0} test notifications".format(lbid)) + + if not test_mnb_connection(): + # Abort the test notifications + LOG.info("Aborting test Notifications. Could not connect") + return + + #Note lbid is the number of notifications to send + lbid += 1 + for x in xrange(1, lbid): + payload = _build_payload(now, now, "Test LB", str(x), + str(tenant_id), 'active') + _notify('lbaas', 'lbaas.instance.test', payload) + + +def _build_payload(begin, end, name, id, tenant, status): + return { + "audit_period_beginning": begin, + "audit_period_ending": end, + "display_name": name, + "id": id, + "type": "lbaas.std", + "type_id": 1, + "tenant_id": tenant, + "state": status.lower(), + "state_description": status.lower() + } + + +def _build_metrics(bytes): + return { + "metric_name": "lbaas.network.outgoing.bytes", + "metric_type": "gauge", + "metric_units": "BYTES", + "metric_value": bytes + } + + +def _rounded_down_min(ts, N): + ts = ts - datetime.timedelta(minutes=ts.minute % N, + seconds=ts.second, + microseconds=ts.microsecond) + return ts + + +def _get_lbs(): + all_lbs = {} + with db_session() as session: + lbs = session.query( + LoadBalancer.id, + LoadBalancer.tenantid, + LoadBalancer.name, + LoadBalancer.status, + ).all() + + for lb in lbs: + all_lbs[lb.id] = { + "tenant_id": lb.tenantid, + "name": lb.name, + "status": lb.status + } + session.commit() + return all_lbs diff --git a/requirements.txt b/requirements.txt index 0f82c313..0f04ef20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ wsme>=0.5b2 mysql-connector-python ipaddress==1.0.4 six<1.4.0 +kombu