diff --git a/bin/mnbtest.py b/bin/mnbtest.py new file mode 100755 index 00000000..cc67fbf8 --- /dev/null +++ b/bin/mnbtest.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +############################################################################## +# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################## +import logging as std_logging +import time + +from oslo.config import cfg +from libra.openstack.common import log as logging +from libra.common.api.mnb import update_mnb +from libra import __version__ + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + +CONF.register_opts([ + cfg.IntOpt('testcount', + metavar='COUNT', + default=1, + help='Number of messages to send') +]) + + +def main(): + CONF(project='mnbtest', version=__version__) + logging.setup('mnbtest') + LOG.debug('Configuration:') + + print "Starting Test" + print "LOG FILE = {0}".format(CONF.log_file) + LOG.info('STARTING MNBTEST') + CONF.log_opt_values(LOG, std_logging.DEBUG) + + LOG.info("Calling update_mnb with {0} messages".format(CONF.testcount)) + update_mnb('lbaas.instance.test', CONF.testcount, 456) + + time.sleep(30) + +if __name__ == "__main__": + main() diff --git a/etc/libra.cfg b/etc/libra.cfg index 72e03a7b..26faa3ba 100644 --- a/etc/libra.cfg +++ b/etc/libra.cfg @@ -24,6 +24,28 @@ #syslog_faciltiy = local7 #logstash = HOST:PORT +# Openstack +#notification_driver = openstack.common.notifier.rpc_notifier +#default_notification_level = INFO +#default_publisher_id = id +#host = localhost + +# Kombu +rabbit_use_ssl = False +#kombu_ssl_version = '' +#kombu_ssl_keyfile = '' +#kombu_ssl_certfile = '' +#kombu_ssl_ca_certs = '' +#rabbit_host = localhost +#rabbit_port = 5672 +#rabbit_userid = user +#rabbit_password = password +#rabbit_hosts = +#rabbit_virtual_host = vhost +#rabbit_retry_interval = 1 +#rabbit_retry_backoff = 2 +#rabbit_max_retries = 0 +#rabbit_ha_queues = False #----------------------------------------------------------------------- # Options for utilities that are Gearman workers or clients. @@ -110,6 +132,10 @@ nova_tenant_id = TENANTID #stats_poll_timeout = 5 #stats_poll_timeout_retry = 30 #vip_pool_size = 10 +#billing_enable = False +#exists_freq = 60 +#usage_freq = 60 +#stats_freq = 5 # Required options db_sections = mysql1 @@ -125,7 +151,6 @@ datadog_tags = service:lbaas # Others - #----------------------------------------------------------------------- # The [api] section is specific to the libra_api utility. #----------------------------------------------------------------------- @@ -149,7 +174,6 @@ ssl_certfile = certfile.crt ssl_keyfile = keyfile.key ip_filters = 192.168.0.0/24 - #----------------------------------------------------------------------- # The [mysql*] sections are referenced by admin_api and api by the # db_sections values. diff --git a/etc/mnb.cfg b/etc/mnb.cfg new file mode 100644 index 00000000..5ac9305a --- /dev/null +++ b/etc/mnb.cfg @@ -0,0 +1,39 @@ +######################################################################## +# Config for oslo notifier +######################################################################## + +[DEFAULT] +# Options to enable more verbose output +verbose = true +debug = true +use_stderr = true +publish_errors = true +logfile = /tmp/libra.log + +# Openstack +notification_driver = drivername +default_notification_level = INFO +default_publisher_id = lbaas +host = apiTest + +# Kombu +rabbit_use_ssl = True +rabbit_host = localhost +rabbit_port = 5671 +rabbit_userid = user +rabbit_password = password +#rabbit_hosts = +rabbit_virtual_host = vhost +rabbit_retry_interval = 1 +rabbit_retry_backoff = 2 +rabbit_max_retries = 0 +rabbit_ha_queues = False +fake_rabbit = False +control_exchange = exchange +amqp_durable_queues = True + +[admin_api] +billing_enable = True +exists_freq = 20 +logfile = /tmp/libra_admin.log +db_sections = '' diff --git a/libra/admin_api/__init__.py b/libra/admin_api/__init__.py index 5086614f..dc400df0 100644 --- a/libra/admin_api/__init__.py +++ b/libra/admin_api/__init__.py @@ -86,6 +86,55 @@ cfg.CONF.register_opts( cfg.IntOpt('vip_pool_size', default=10, help='Number of hot spare vips to keep in the pool'), + cfg.BoolOpt('billing_enable', + default=False, + help='Enable / Disable billing notifications'), + cfg.BoolOpt('stats_enable', + default=False, + help='Enable / Disable usage statistics gathering'), + cfg.IntOpt('exists_freq', + metavar='MINUTES', + default=60, + help='Minutes between sending of billing exists messages'), + cfg.IntOpt('usage_freq', + metavar='MINUTES', + default=60, + help='Minutes between sending of billing usage messages'), + cfg.IntOpt('stats_freq', + metavar='MINUTES', + default=5, + help='Minutes between sending STATS requests to workers'), + cfg.BoolOpt('stats_purge_enable', + default=False, + help='Enable / Disable purging of usage statistics'), + cfg.IntOpt('stats_purge_days', + metavar='DAYS', + default=5, + help='Number of days to keep usage STATS before purging'), + cfg.IntOpt('delete_timer_seconds', + default=5, + help='Which second of each minute delete timer should run'), + cfg.IntOpt('ping_timer_seconds', + default=15, + help='Second of each minute ping timer should run'), + cfg.IntOpt('stats_timer_seconds', + default=20, + help='Second of each minute stats timer should run'), + cfg.IntOpt('usage_timer_seconds', + default=25, + help='Which second of each minute usage timer should run'), + cfg.IntOpt('probe_timer_seconds', + default=30, + help='Which second of each minute probe timer should run'), + cfg.IntOpt('offline_timer_seconds', + default=45, + help='Second of each minute offline timer should run'), + cfg.IntOpt('vips_timer_seconds', + default=50, + help='Which second of each minute vips timer should run'), + cfg.IntOpt('exists_timer_seconds', + default=55, + help='Second of each minute exists timer should run'), ], group=adminapi_group ) diff --git a/libra/admin_api/app.py b/libra/admin_api/app.py index 715db454..2cda1d40 100644 --- a/libra/admin_api/app.py +++ b/libra/admin_api/app.py @@ -29,7 +29,10 @@ from eventlet import wsgi from libra import __version__ from libra.common.api import server from libra.admin_api.stats.drivers.base import known_drivers -from libra.admin_api.stats.scheduler import Stats +from libra.admin_api.stats.ping_sched import PingStats +from libra.admin_api.stats.offline_sched import OfflineStats +from libra.admin_api.stats.billing_sched import BillingStats +from libra.admin_api.stats.stats_sched import UsageStats from libra.admin_api.device_pool.manage_pool import Pool from libra.admin_api.expunge.expunge import ExpungeScheduler from libra.admin_api import config as api_config @@ -97,13 +100,27 @@ class MaintThreads(object): self.run_threads() def run_threads(self): - stats = Stats(self.drivers) + pool = Pool() - expunge = ExpungeScheduler() - self.classes.append(stats) self.classes.append(pool) + + expunge = ExpungeScheduler() self.classes.append(expunge) + pings = PingStats(self.drivers) + self.classes.append(pings) + + offline = OfflineStats(self.drivers) + self.classes.append(offline) + + if CONF['admin_api'].stats_enable: + usage = UsageStats(self.drivers) + self.classes.append(usage) + + if CONF['admin_api'].billing_enable: + billing = BillingStats(self.drivers) + self.classes.append(billing) + def exit_handler(self, signum, frame): signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) diff --git a/libra/admin_api/device_pool/manage_pool.py b/libra/admin_api/device_pool/manage_pool.py index eac7c88a..2c456b3f 100644 --- a/libra/admin_api/device_pool/manage_pool.py +++ b/libra/admin_api/device_pool/manage_pool.py @@ -31,9 +31,9 @@ LOG = log.getLogger(__name__) class Pool(object): - DELETE_SECONDS = 05 - PROBE_SECONDS = 30 - VIPS_SECONDS = 50 + DELETE_SECONDS = cfg.CONF['admin_api'].delete_timer_seconds + PROBE_SECONDS = cfg.CONF['admin_api'].probe_timer_seconds + VIPS_SECONDS = cfg.CONF['admin_api'].vips_timer_seconds def __init__(self): self.probe_timer = None diff --git a/libra/admin_api/stats/billing_sched.py b/libra/admin_api/stats/billing_sched.py new file mode 100644 index 00000000..348bcbc6 --- /dev/null +++ b/libra/admin_api/stats/billing_sched.py @@ -0,0 +1,192 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import datetime + +from oslo.config import cfg +from libra.common.api.lbaas import Billing, db_session +from libra.common.api.mnb import update_mnb, test_mnb_connection +from libra.openstack.common import timeutils +from libra.openstack.common import log as logging +from sqlalchemy.sql import func + + +LOG = logging.getLogger(__name__) + + +class BillingStats(object): + + EXISTS_SECONDS = cfg.CONF['admin_api'].exists_timer_seconds + USAGE_SECONDS = cfg.CONF['admin_api'].usage_timer_seconds + + def __init__(self, drivers): + self.drivers = drivers + self.usage_timer = None + self.exists_timer = None + self.server_id = cfg.CONF['admin_api']['server_id'] + self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] + self.exists_freq = cfg.CONF['admin_api'].exists_freq + self.usage_freq = cfg.CONF['admin_api'].usage_freq + self.start_usage_sched() + self.start_exists_sched() + + def shutdown(self): + if self.usage_timer: + self.usage_timer.cancel() + if self.exists_timer: + self.exists_timer.cancel() + + def update_usage(self): + # Work out if it is our turn to run + minute = datetime.datetime.now().minute + if self.server_id != minute % self.number_of_servers: + self.start_usage_sched() + return + + # Send periodic usage notifications + try: + self._exec_usage() + except Exception: + LOG.exception('Uncaught exception during billing usage update') + + # Need to restart timer after every billing cycle + self.start_usage_sched() + + def update_exists(self): + # Work out if it is our turn to run + minute = datetime.datetime.now().minute + if self.server_id != minute % self.number_of_servers: + self.start_exists_sched() + return + + # Send periodic exists notifications + try: + self._exec_exists() + except Exception: + LOG.exception('Uncaught exception during billing exists update') + + # Need to restart timer after every billing cycle + self.start_exists_sched() + + def _exec_exists(self): + with db_session() as session: + # Check if it's time to send exists notifications + delta = datetime.timedelta(minutes=self.exists_freq) + exp = timeutils.utcnow() - delta + exp_time = exp.strftime('%Y-%m-%d %H:%M:%S') + + updated = session.query( + Billing.last_update + ).filter(Billing.name == "exists").\ + filter(Billing.last_update > exp_time).\ + first() + + if updated is not None: + # Not time yet + LOG.info('Not time to send exists notifications yet {0}'. + format(exp_time)) + session.rollback() + return + + # Check the connection before sending the notifications + if not test_mnb_connection(): + # Abort the exists notifications + LOG.info("Aborting exists notifications. Could not connect") + session.rollback() + return + + #Update the exists timestamp now + session.query(Billing).\ + filter(Billing.name == "exists").\ + update({"last_update": func.now()}, + synchronize_session='fetch') + session.commit() + + # Send the notifications + update_mnb('lbaas.instance.exists', None, None) + + def _exec_usage(self): + with db_session() as session: + # Next check if it's time to send bandwidth usage notifications + delta = datetime.timedelta(minutes=self.usage_freq) + exp = timeutils.utcnow() - delta + + start, = session.query( + Billing.last_update + ).filter(Billing.name == "usage").\ + first() + + if start and start > exp: + # Not time yet + LOG.info('Not time to send usage statistics yet {0}'. + format(exp)) + session.rollback() + return + + # Check the connection before sending the notifications + if not test_mnb_connection(): + # Abort the exists notifications + LOG.info("Aborting usage notifications. Could not connect") + session.rollback() + return + + # Calculate the stopping point by rounding backward to the nearest + # N minutes. i.e. if N = 60, this will round us back to HH:00:00, + # or if N = 15, it will round us back to HH:15:00, HH:30:00, + # HH:45:00, or HH:00:00, whichever is closest. + N = cfg.CONF['admin_api'].usage_freq + now = timeutils.utcnow() + stop = now - datetime.timedelta(minutes=now.minute % N, + seconds=now.second, + microseconds=now.microsecond) + + # Release the lock + session.query(Billing).\ + filter(Billing.name == "usage").\ + update({"last_update": stop}, + synchronize_session='fetch') + session.commit() + + # Send the usage notifications. Pass the timestamps to save + # queries. + update_mnb('lbaas.bandwidth.usage', start, stop) + + def start_usage_sched(self): + # Always try to hit the expected second mark for usage + seconds = datetime.datetime.now().second + if seconds < self.USAGE_SECONDS: + sleeptime = self.USAGE_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.USAGE_SECONDS) + + LOG.info('LB usage timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.usage_timer =\ + threading.Timer(sleeptime, self.update_usage, ()) + self.usage_timer.start() + + def start_exists_sched(self): + # Always try to hit the expected second mark for exists + seconds = datetime.datetime.now().second + if seconds < self.EXISTS_SECONDS: + sleeptime = self.EXISTS_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.EXISTS_SECONDS) + + LOG.info('LB exists timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.exists_timer =\ + threading.Timer(sleeptime, self.update_exists, ()) + self.exists_timer.start() diff --git a/libra/admin_api/stats/offline_sched.py b/libra/admin_api/stats/offline_sched.py new file mode 100644 index 00000000..065046ed --- /dev/null +++ b/libra/admin_api/stats/offline_sched.py @@ -0,0 +1,161 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading + +from datetime import datetime +from oslo.config import cfg + +from libra.common.api.lbaas import Device, db_session +from libra.admin_api.stats.stats_gearman import GearJobs +from libra.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class OfflineStats(object): + + OFFLINE_SECONDS = cfg.CONF['admin_api'].offline_timer_seconds + + def __init__(self, drivers): + self.drivers = drivers + self.offline_timer = None + self.ping_limit = cfg.CONF['admin_api']['stats_offline_ping_limit'] + self.error_limit = cfg.CONF['admin_api']['stats_device_error_limit'] + self.server_id = cfg.CONF['admin_api']['server_id'] + self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] + + self.start_offline_sched() + + def shutdown(self): + if self.offline_timer: + self.offline_timer.cancel() + + def check_offline_lbs(self): + # Work out if it is our turn to run + minute = datetime.now().minute + if self.server_id != minute % self.number_of_servers: + LOG.info('Not our turn to run OFFLINE check, sleeping') + self.start_offline_sched() + return + tested = 0 + failed = 0 + try: + tested, failed = self._exec_offline_check() + except Exception: + LOG.exception('Uncaught exception during OFFLINE check') + # Need to restart timer after every ping cycle + LOG.info( + '{tested} OFFLINE loadbalancers tested, {failed} failed' + .format(tested=tested, failed=failed) + ) + self.start_offline_sched() + + def _exec_offline_check(self): + tested = 0 + failed = 0 + node_list = [] + LOG.info('Running OFFLINE check') + with db_session() as session: + # Join to ensure device is in-use + devices = session.query( + Device.id, Device.name + ).filter(Device.status == 'OFFLINE').all() + + tested = len(devices) + if tested == 0: + LOG.info('No OFFLINE Load Balancers to check') + return (0, 0) + for lb in devices: + node_list.append(lb.name) + gearman = GearJobs() + failed_lbs = gearman.offline_check(node_list) + failed = len(failed_lbs) + if failed > self.error_limit: + LOG.error( + 'Too many simultaneous Load Balancer Failures.' + ' Aborting deletion attempt' + ) + return tested, failed + + if failed > 0: + self._send_delete(failed_lbs) + + # Clear the ping counts for all devices not in + # the failed list + succeeded = list(set(node_list) - set(failed_lbs)) + session.query(Device.name, Device.pingCount).\ + filter(Device.name.in_(succeeded)).\ + update({"pingCount": 0}, synchronize_session='fetch') + + session.commit() + + return tested, failed + + def _send_delete(self, failed_nodes): + with db_session() as session: + for lb in failed_nodes: + # Get the current ping count + data = session.query( + Device.id, Device.pingCount).\ + filter(Device.name == lb).first() + + if not data: + LOG.error( + 'Device {0} no longer exists'.format(data.id) + ) + continue + + if data.pingCount < self.ping_limit: + data.pingCount += 1 + LOG.error( + 'Offline Device {0} has failed {1} ping attempts'. + format(lb, data.pingCount) + ) + session.query(Device).\ + filter(Device.name == lb).\ + update({"pingCount": data.pingCount}, + synchronize_session='fetch') + session.flush() + continue + + message = ( + 'Load balancer {0} unreachable and marked for deletion'. + format(lb) + ) + for driver in self.drivers: + instance = driver() + LOG.info( + 'Sending delete request for {0} to {1}'.format( + lb, instance.__class__.__name__ + ) + ) + instance.send_delete(message, data.id) + session.commit() + + def start_offline_sched(self): + # Always try to hit the expected second mark for offline checks + seconds = datetime.now().second + if seconds < self.OFFLINE_SECONDS: + sleeptime = self.OFFLINE_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.OFFLINE_SECONDS) + + LOG.info('LB offline check timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.offline_timer = threading.Timer( + sleeptime, self.check_offline_lbs, () + ) + self.offline_timer.start() diff --git a/libra/admin_api/stats/scheduler.py b/libra/admin_api/stats/ping_sched.py similarity index 66% rename from libra/admin_api/stats/scheduler.py rename to libra/admin_api/stats/ping_sched.py index 34884e2c..5949257b 100644 --- a/libra/admin_api/stats/scheduler.py +++ b/libra/admin_api/stats/ping_sched.py @@ -16,29 +16,20 @@ import threading from datetime import datetime from oslo.config import cfg - from libra.common.api.lbaas import LoadBalancer, Device, Node, db_session -from libra.openstack.common import log +from libra.openstack.common import log as logging from libra.admin_api.stats.stats_gearman import GearJobs - -LOG = log.getLogger(__name__) +LOG = logging.getLogger(__name__) -class NodeNotFound(Exception): - pass +class PingStats(object): - -class Stats(object): - - PING_SECONDS = 15 - OFFLINE_SECONDS = 45 + PING_SECONDS = cfg.CONF['admin_api'].ping_timer_seconds def __init__(self, drivers): self.drivers = drivers self.ping_timer = None - self.offline_timer = None - self.ping_limit = cfg.CONF['admin_api']['stats_offline_ping_limit'] self.error_limit = cfg.CONF['admin_api']['stats_device_error_limit'] self.server_id = cfg.CONF['admin_api']['server_id'] self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] @@ -46,33 +37,10 @@ class Stats(object): LOG.info("Selected stats drivers: %s", self.stats_driver) self.start_ping_sched() - self.start_offline_sched() def shutdown(self): if self.ping_timer: self.ping_timer.cancel() - if self.offline_timer: - self.offline_timer.cancel() - - def check_offline_lbs(self): - # Work out if it is our turn to run - minute = datetime.now().minute - if self.server_id != minute % self.number_of_servers: - LOG.info('Not our turn to run OFFLINE check, sleeping') - self.start_offline_sched() - return - tested = 0 - failed = 0 - try: - tested, failed = self._exec_offline_check() - except Exception: - LOG.exception('Uncaught exception during OFFLINE check') - # Need to restart timer after every ping cycle - LOG.info( - '{tested} OFFLINE loadbalancers tested, {failed} failed' - .format(tested=tested, failed=failed) - ) - self.start_offline_sched() def ping_lbs(self): # Work out if it is our turn to run @@ -126,47 +94,6 @@ class Stats(object): return pings, failed - def _exec_offline_check(self): - tested = 0 - failed = 0 - node_list = [] - LOG.info('Running OFFLINE check') - with db_session() as session: - # Join to ensure device is in-use - devices = session.query( - Device.id, Device.name - ).filter(Device.status == 'OFFLINE').all() - - tested = len(devices) - if tested == 0: - LOG.info('No OFFLINE Load Balancers to check') - return (0, 0) - for lb in devices: - node_list.append(lb.name) - gearman = GearJobs() - failed_lbs = gearman.offline_check(node_list) - failed = len(failed_lbs) - if failed > self.error_limit: - LOG.error( - 'Too many simultaneous Load Balancer Failures.' - ' Aborting deletion attempt' - ) - return tested, failed - - if failed > 0: - self._send_delete(failed_lbs) - - # Clear the ping counts for all devices not in - # the failed list - succeeded = list(set(node_list) - set(failed_lbs)) - session.query(Device.name, Device.pingCount).\ - filter(Device.name.in_(succeeded)).\ - update({"pingCount": 0}, synchronize_session='fetch') - - session.commit() - - return tested, failed - def _send_fails(self, failed_lbs): with db_session() as session: for lb in failed_lbs: @@ -196,47 +123,6 @@ class Stats(object): instance.send_alert(message, data.id) session.commit() - def _send_delete(self, failed_nodes): - with db_session() as session: - for lb in failed_nodes: - # Get the current ping count - data = session.query( - Device.id, Device.pingCount).\ - filter(Device.name == lb).first() - - if not data: - LOG.error( - 'Device {0} no longer exists'.format(data.id) - ) - continue - - if data.pingCount < self.ping_limit: - data.pingCount += 1 - LOG.error( - 'Offline Device {0} has failed {1} ping attempts'. - format(lb, data.pingCount) - ) - session.query(Device).\ - filter(Device.name == lb).\ - update({"pingCount": data.pingCount}, - synchronize_session='fetch') - session.flush() - continue - - message = ( - 'Load balancer {0} unreachable and marked for deletion'. - format(lb) - ) - for driver in self.drivers: - instance = driver() - LOG.info( - 'Sending delete request for {0} to {1}'.format( - lb, instance.__class__.__name__ - ) - ) - instance.send_delete(message, data.id) - session.commit() - def _get_lb(self, lb, session): lb = session.query( LoadBalancer.tenantid, Device.floatingIpAddr, Device.id @@ -358,17 +244,3 @@ class Stats(object): LOG.info('LB ping check timer sleeping for %d seconds', sleeptime) self.ping_timer = threading.Timer(sleeptime, self.ping_lbs, ()) self.ping_timer.start() - - def start_offline_sched(self): - # Always try to hit the expected second mark for offline checks - seconds = datetime.now().second - if seconds < self.OFFLINE_SECONDS: - sleeptime = self.OFFLINE_SECONDS - seconds - else: - sleeptime = 60 - (seconds - self.OFFLINE_SECONDS) - - LOG.info('LB offline check timer sleeping for %d seconds', sleeptime) - self.offline_timer = threading.Timer( - sleeptime, self.check_offline_lbs, () - ) - self.offline_timer.start() diff --git a/libra/admin_api/stats/stats_gearman.py b/libra/admin_api/stats/stats_gearman.py index b8df85d5..342dd15c 100644 --- a/libra/admin_api/stats/stats_gearman.py +++ b/libra/admin_api/stats/stats_gearman.py @@ -47,7 +47,7 @@ class GearJobs(object): failed_list = [] node_status = dict() retry_list = [] - job_data = {"hpcs_action": "STATS"} + job_data = {"hpcs_action": "PING"} for node in node_list: list_of_jobs.append(dict(task=str(node), data=job_data)) submitted_pings = self.gm_client.submit_multiple_jobs( @@ -143,3 +143,62 @@ class GearJobs(object): if gearman_fail > max_fail_count: failed_list.append(ping.job.task) return failed_list + + def get_stats(self, node_list): + # TODO: lots of duplicated code that needs cleanup + list_of_jobs = [] + failed_list = [] + retry_list = [] + results = {} + job_data = {"hpcs_action": "STATS"} + for node in node_list: + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_stats = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=self.poll_timeout + ) + for stats in submitted_stats: + if stats.state == JOB_UNKNOWN: + # TODO: Gearman server failed, ignoring for now + retry_list.append(stats.job.task) + if stats.timed_out: + # Timeout + retry_list.append(stats.job.task) + if stats.result['hpcs_response'] == 'FAIL': + # Error returned by Gearman + failed_list.append(stats.job.task) + else: + #Success + results[stats.job.task] = stats.result + + list_of_jobs = [] + if len(retry_list) > 0: + LOG.info( + "{0} stats timed out, retrying".format(len(retry_list)) + ) + for node in retry_list: + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_stats = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=self.poll_retry + ) + for stats in submitted_stats: + if stats.state == JOB_UNKNOWN: + # TODO: Gearman server failed, ignoring for now + LOG.error( + "Gearman Job server failed during STATS check of {0}". + format(stats.job.task) + ) + failed_list.append(stats.job.task) + if stats.timed_out: + # Timeout + failed_list.append(stats.job.task) + if stats.result['hpcs_response'] == 'FAIL': + # Error returned by Gearman + failed_list.append(stats.job.task) + continue + else: + #Success + results[stats.job.task] = stats.result + + return failed_list, results diff --git a/libra/admin_api/stats/stats_sched.py b/libra/admin_api/stats/stats_sched.py new file mode 100644 index 00000000..cee08efe --- /dev/null +++ b/libra/admin_api/stats/stats_sched.py @@ -0,0 +1,217 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import datetime + +from oslo.config import cfg +from libra.common.api.lbaas import LoadBalancer, Device, db_session +from libra.common.api.lbaas import Billing, Stats +from libra.admin_api.stats.stats_gearman import GearJobs +from libra.openstack.common import timeutils +from libra.openstack.common import log as logging +from sqlalchemy.sql import func + + +LOG = logging.getLogger(__name__) + + +class UsageStats(object): + + STATS_SECONDS = cfg.CONF['admin_api'].stats_timer_seconds + + def __init__(self, drivers): + self.drivers = drivers + self.stats_timer = None + self.server_id = cfg.CONF['admin_api']['server_id'] + self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] + self.stats_freq = cfg.CONF['admin_api'].stats_freq + self.billing_enable = cfg.CONF['admin_api'].billing_enable + + self.start_stats_sched() + + def shutdown(self): + if self.billing_timer: + self.billing_timer.cancel() + + def gather_stats(self): + # Work out if it is our turn to run + minute = datetime.datetime.now().minute + if self.server_id != minute % self.number_of_servers: + self.start_stats_sched() + return + total = 0 + fail = 0 + try: + fail, total = self._exec_stats() + except Exception: + LOG.exception('Uncaught exception during stats collection') + + # Need to restart timer after every stats cycle + LOG.info('{total} lb device stats queried, {fail} failed' + .format(total=total, fail=fail)) + self.start_stats_sched() + + def _exec_stats(self): + failed = 0 + node_list = [] + with db_session() as session: + delta = datetime.timedelta(minutes=self.stats_freq) + exp = timeutils.utcnow() - delta + exp_time = exp.strftime('%Y-%m-%d %H:%M:%S') + + updated = session.query( + Billing.last_update + ).filter(Billing.name == "stats").\ + filter(Billing.last_update > exp_time).\ + first() + + if updated is not None: + # Not time yet + LOG.info('Not time to gather stats yet {0}'.format(exp_time)) + session.rollback() + return 0, 0 + + #Update the stats timestamp + session.query(Billing).\ + filter(Billing.name == "stats").\ + update({"last_update": func.now()}, + synchronize_session='fetch') + + # Get all the online devices to query for stats + devices = session.query( + Device.id, Device.name + ).filter(Device.status == 'ONLINE').all() + + if devices is None or len(devices) == 0: + LOG.error('No ONLINE devices to gather usage stats from') + session.rollback() + return 0, 0 + total = len(devices) + + for device in devices: + node_list.append(device.name) + gearman = GearJobs() + failed_list, results = gearman.get_stats(node_list) + failed = len(failed_list) + + if failed > 0: + self._send_fails(failed_list) + + if total > failed: + # We have some success + self._update_stats(results, failed_list) + session.commit() + else: + # Everything failed. Retry these on the next timer firing + session.rollback() + + return failed, total + + def _update_stats(self, results, failed_list): + with db_session() as session: + lbs = session.query( + LoadBalancer.id, + LoadBalancer.protocol, + LoadBalancer.status, + Device.name + ).join(LoadBalancer.devices).\ + filter(Device.status == 'ONLINE').all() + + if lbs is None: + session.rollback() + LOG.error('No Loadbalancers found when updating stats') + return + + total = len(lbs) + added = 0 + for lb in lbs: + if lb.name not in results: + if lb.name not in failed_list: + LOG.error( + 'No stats results found for Device {0}, LBID {1}' + .format(lb.name, lb.id)) + continue + + result = results[lb.name] + protocol = lb.protocol.lower() + if protocol != "http": + # GALERA or TCP = TCP at the worker + protocol = "tcp" + + bytes_out = -1 + for data in result["loadBalancers"]: + if data["protocol"] == protocol: + bytes_out = data["bytes_out"] + + if bytes_out == -1: + LOG.error( + 'No stats found for Device {0}, ' + 'LBID {1}, protocol {2}' + .format(lb.name, lb.id, protocol)) + continue + + new_entry = Stats() + new_entry.lbid = lb.id + new_entry.period_start = result["utc_start"] + new_entry.period_end = result["utc_end"] + new_entry.bytes_out = bytes_out + new_entry.status = lb.status + session.add(new_entry) + session.flush + added += 1 + session.commit() + LOG.info( + '{total} loadbalancers stats queried, {fail} failed' + .format(total=total, fail=total - added)) + + def _send_fails(self, failed_list): + with db_session() as session: + for device_name in failed_list: + data = self._get_lb(device_name, session) + if not data: + LOG.error( + 'Device {0} has no Loadbalancer attached during STATS'. + format(device_name) + ) + continue + + LOG.error( + 'Load balancer failed STATS request ' + 'ID: {0}\n' + 'IP: {1}\n' + 'tenant: {2}\n'.format( + data.id, data.floatingIpAddr, + data.tenantid)) + + def _get_lb(self, lb, session): + lb = session.query( + LoadBalancer.tenantid, Device.floatingIpAddr, Device.id + ).join(LoadBalancer.devices).\ + filter(Device.name == lb).first() + + return lb + + def start_stats_sched(self): + # Always try to hit the expected second mark for stats + seconds = datetime.datetime.now().second + if seconds < self.STATS_SECONDS: + sleeptime = self.STATS_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.STATS_SECONDS) + + LOG.info('LB stats timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.stats_timer = threading.Timer(sleeptime, self.gather_stats, ()) + self.stats_timer.start() diff --git a/libra/api/controllers/load_balancers.py b/libra/api/controllers/load_balancers.py index 92b01082..fb95ad81 100644 --- a/libra/api/controllers/load_balancers.py +++ b/libra/api/controllers/load_balancers.py @@ -33,6 +33,7 @@ from libra.common.exc import ExhaustedError from libra.api.model.validators import LBPut, LBPost, LBResp, LBVipResp from libra.api.model.validators import LBRespNode from libra.common.api.gearman_client import submit_job +from libra.common.api.mnb import update_mnb from libra.api.acl import get_limited_to_project from libra.api.library.exp import OverLimit, IPOutOfRange, NotFound from libra.api.library.exp import ImmutableEntity, ImmutableStates @@ -469,6 +470,9 @@ class LoadBalancersController(RestController): 'UPDATE', device.name, device.id, lb.id ) + #Notify billing of the LB creation + update_mnb('lbaas.instance.create', lb.id, tenant_id) + return return_data @wsme_pecan.wsexpose(None, body=LBPut, status_code=202) @@ -579,6 +583,9 @@ class LoadBalancersController(RestController): submit_job( 'DELETE', device.name, device.id, lb.id ) + + #Notify billing of the LB deletion + update_mnb('lbaas.instance.delete', lb.id, tenant_id) return None def usage(self, load_balancer_id): diff --git a/libra/common/api/gearman_client.py b/libra/common/api/gearman_client.py index 58518cbf..c4faaad7 100644 --- a/libra/common/api/gearman_client.py +++ b/libra/common/api/gearman_client.py @@ -33,7 +33,8 @@ gearman_workers = [ 'DELETE', # Delete a Load Balancer. 'DISCOVER', # Return service discovery information. 'ARCHIVE', # Archive LB log files. - 'STATS' # Get load balancer statistics. + 'STATS', # Get load balancer statistics. + 'PING' # Ping load balancers ] diff --git a/libra/common/api/lbaas.py b/libra/common/api/lbaas.py index 9ec1790c..0f2e4d27 100644 --- a/libra/common/api/lbaas.py +++ b/libra/common/api/lbaas.py @@ -20,7 +20,7 @@ import time from oslo.config import cfg from pecan import conf from sqlalchemy import Table, Column, Integer, ForeignKey, create_engine -from sqlalchemy import INTEGER, VARCHAR, BIGINT +from sqlalchemy import INTEGER, VARCHAR, BIGINT, DATETIME from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, backref, sessionmaker, Session @@ -151,6 +151,28 @@ class HealthMonitor(DeclarativeBase): path = Column(u'path', VARCHAR(length=2000)) +class Billing(DeclarativeBase): + __tablename__ = 'billing' + id = Column(u'id', Integer, primary_key=True, nullable=False) + name = Column(u'name', VARCHAR(length=128), nullable=False) + last_update = Column(u'last_update', DATETIME(), nullable=False) + + +class Stats(DeclarativeBase): + """stats model""" + __tablename__ = 'stats' + #column definitions + id = Column(u'id', BIGINT(), primary_key=True, nullable=False) + lbid = Column( + u'lbid', BIGINT(), ForeignKey('loadbalancers.id'), primary_key=True, + nullable=False + ) + period_start = Column(u'period_start', DATETIME(), nullable=False) + period_end = Column(u'period_end', DATETIME(), nullable=False) + bytes_out = Column(u'bytes_out', BIGINT(), nullable=False) + status = Column(u'status', VARCHAR(length=50), nullable=False) + + class RoutingSession(Session): """ Try to use the first engine provided. If this fails use the next in sequence and so on. Reset to the first after 60 seconds diff --git a/libra/common/api/lbaas.sql b/libra/common/api/lbaas.sql index 7c4987d3..3f49e371 100644 --- a/libra/common/api/lbaas.sql +++ b/libra/common/api/lbaas.sql @@ -72,7 +72,7 @@ CREATE TABLE monitors ( timeout INT NOT NULL, # Maximum number of seconds to wait for a connection to the node before it times out. attemptsBeforeDeactivation INT NOT NULL, # Number of permissible failures before removing a node from rotation. 1 to 10. path VARCHAR(2000) NULL, # The HTTP path used in the request by the monitor. Begins with / - PRIMARY KEY (lbid) # ids are unique accross all Nodes + PRIMARY KEY (lbid) # ids are unique across all Nodes ) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci; CREATE TABLE `pool_building` ( @@ -100,3 +100,24 @@ CREATE TABLE `global_limits` ( INSERT INTO `global_limits` VALUES (1,'maxLoadBalancerNameLength',128),(2,'maxVIPsPerLoadBalancer',1),(3,'maxNodesPerLoadBalancer',50),(4,'maxLoadBalancers',20); +# Billing +CREATE TABLE billing ( + id int(11) NOT NULL, + name varchar(128) NOT NULL, + last_update DATETIME NOT NULL DEFAULT '0000-00-00 00:00:00', # timestamp of when the feature was last updated + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET latin1; + +INSERT INTO billing VALUES (1, 'stats', '0000-00-00 00:00:00'),(2, 'usage', '0000-00-00 00:00:00'),(3, 'exists', '0000-00-00 00:00:00'); + +# Stats +CREATE TABLE stats ( + id BIGINT NOT NULL AUTO_INCREMENT, # unique id for this billing record + lbid BIGINT NOT NULL REFERENCES loadblancers(id), # fk for lbid + period_start DATETIME NOT NULL, # timestamp of when this period started + period_end DATETIME NOT NULL, # timestamp of when this period ended + bytes_out BIGINT NOT NULL, # bytes transferred in this period + status VARCHAR(50) NOT NULL, # Current LB status + PRIMARY KEY (id) # ids are unique across all LBs + ) ENGINE=InnoDB DEFAULT CHARSET latin1; + \ No newline at end of file diff --git a/libra/common/api/mnb.py b/libra/common/api/mnb.py new file mode 100644 index 00000000..d495d3f3 --- /dev/null +++ b/libra/common/api/mnb.py @@ -0,0 +1,367 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import eventlet +eventlet.monkey_patch() + +from oslo.config import cfg +from libra.common.api.lbaas import LoadBalancer, db_session +from libra.common.api.lbaas import Stats +from libra.openstack.common.notifier import api as notifier_api +from libra.openstack.common import timeutils +from libra.openstack.common import log as logging +from libra.openstack.common import rpc +from libra.openstack.common.rpc import common as rpc_common +from sqlalchemy.sql import func + + +LOG = logging.getLogger(__name__) + + +def update_mnb(event_type, lbid, tenant_id): + # Start a new thread + eventlet.spawn_n(client_job, event_type, lbid, tenant_id) + + +def client_job(event_type, lbid, tenant_id): + + try: + if(event_type == 'lbaas.instance.create' or + event_type == 'lbaas.instance.delete'): + _send_create_or_delete(event_type, lbid, tenant_id) + elif event_type == 'lbaas.instance.exists': + _send_exists(event_type) + elif event_type == 'lbaas.bandwidth.usage': + _send_usage(event_type, lbid, tenant_id) + elif event_type == 'lbaas.instance.test': + _send_test(event_type, lbid, tenant_id) + return + + except: + LOG.exception("MnB notify: unhandled exception") + + LOG.error("MnB notification unsuccessful. Type {0}, loadbalancer {1} " + "tenant_id {2}".format(event_type, lbid, tenant_id)) + + +def _notify(service, event_type, payload): + priority = cfg.CONF.default_notification_level + publisher_id = notifier_api.publisher_id(service) + notifier_api.notify(None, publisher_id, event_type, priority, payload) + + +def test_mnb_connection(): + # Because the oslo notifier code does not have a return status + # and exceptions are caught inside oslo (I know...), the best we + # can do here is use the oslo rpc code to try a test connection + # to the MnB servers before the notification(s) are sent. + connected = False + try: + cx = rpc.create_connection() + cx.close() + LOG.info("Verified RPC connection is ready") + connected = True + except rpc_common.RPCException as e: + LOG.error("RPC connect exception: %s", e) + except Exception as e: + LOG.error("Non-RPC connect exception: %s", e) + return connected + + +def _send_create_or_delete(event_type, lbid, tenant_id): + + LOG.info( + "Sending MnB {0} notification to MnB for " + "loadbalancer {1} tenant_id {2}".format( + event_type, lbid, tenant_id) + ) + + if not test_mnb_connection(): + # Abort the notification + if event_type == 'lbaas.instance.create': + LOG.info("Aborting Create Notifications. Could not connect") + else: + LOG.info("Aborting Delete Notifications. Could not connect") + return + + with db_session() as session: + lb = session.query( + LoadBalancer.name, + LoadBalancer.status, + LoadBalancer.created, + LoadBalancer.updated + ).filter(LoadBalancer.id == lbid).\ + filter(LoadBalancer.tenantid == tenant_id).first() + + if lb is None: + session.rollback() + LOG.error("Load Balancer {0} not found for tenant {1}".format( + lbid, tenant_id)) + return + + if event_type == 'lbaas.instance.create': + date = lb.created + else: + date = lb.updated + + # Build the payload + payload = _build_payload(date, date, lb.name, lbid, + tenant_id, lb.status) + + _notify('lbaas', event_type, payload) + session.commit() + + +def _send_exists(event_type): + + LOG.info("Sending MnB {0} notifications to MnB".format(event_type)) + count = 0 + with db_session() as session: + lbs = session.query( + LoadBalancer.id, + LoadBalancer.tenantid, + LoadBalancer.name, + LoadBalancer.status, + LoadBalancer.created, + LoadBalancer.updated + ).filter(LoadBalancer.status != 'DELETED').all() + + if not lbs: + session.rollback() + LOG.error("No existing Load Balancers found") + return + + # Figure out our audit period beging/ending + seconds = (cfg.CONF['admin_api'].exists_freq * 60) + interval = datetime.timedelta(seconds=seconds) + audit_period_ending = timeutils.utcnow() + audit_period_beginning = audit_period_ending - interval + audit_period_beginning = str(audit_period_beginning) + audit_period_ending = str(audit_period_ending) + + for lb in lbs: + LOG.info( + "Sending MnB {0} notification to MnB for " + "loadbalancer {1} tenant_id {2}".format( + event_type, lb.id, lb.tenantid) + ) + + # Build the payload + payload = _build_payload(audit_period_beginning, + audit_period_ending, + lb.name, lb.id, lb.tenantid, lb.status) + + _notify('lbaas', event_type, payload) + count += 1 + + session.commit() + LOG.info("Sent {0} MnB {1} notifications to MnB".format(count, event_type)) + + +def _send_usage(event_type, start, stop): + + LOG.info("Sending MnB {0} notifications to MnB".format(event_type)) + N = cfg.CONF['admin_api'].usage_freq + + with db_session() as session: + + # Start by making sure we have stats in the Stats table and + # track the oldest value in case we need it below. + oldest, = session.query(Stats.period_end).\ + order_by(Stats.id.asc()).first() + + if oldest is None: + # No Stats at all + LOG.info("No usage statistics to send.") + session.rollback() + return + + if start is None: + # The value in the DB must be '0000-00-00 00:00:00 so + # as a starting point, we can find the oldest stat in + # the Stats table and start from there. No sense iterating + # from 0000-00-00 to now looking for stats to send. Also + # round it back to the previous update period + start = _rounded_down_min(oldest, N) + LOG.info("Starting usage notifications from first saved {0}". + format(start)) + + # Now that we know where to start, make sure we have stats to + # send for the time period. Use stats that end in this period. + # It's ok if the stats started in a previous period. Some skew + # is allowed. + total = session.query(Stats).\ + filter(Stats.period_end >= start).\ + filter(Stats.period_end < stop).\ + count() + if total == 0: + LOG.info("No usage statistics to send between {0} and {1}" + .format(start, stop)) + session.rollback() + return + + LOG.info("Found {0} total usage statistics to send between {1} and {2}" + .format(total, start, stop)) + + # Get info on all of our loadbalancers for the payloads. + loadbalancers = _get_lbs() + + # Get ready to loop through however N minute periods we + # have to send. We do it this way rather than one lump sum + # because finer grain data is probably needed on the MnB side. + end = start + datetime.timedelta(minutes=N) + count = 0 + while end <= stop: + # Loop through all N periods up to the current period + # sending usage notifications to MnB + stats = session.query( + Stats.lbid, + func.sum(Stats.bytes_out) + ).group_by(Stats.lbid).\ + filter(Stats.period_end >= start).\ + filter(Stats.period_end < end).\ + all() + + # Prep for the next loop here in case of continue + prev_start = start + prev_end = end + start = end + end = start + datetime.timedelta(minutes=N) + + if not stats: + LOG.info("No usage statistics to send for period {0} to {1}". + format(prev_start, prev_end)) + continue + else: + LOG.info("Sending usage statistics for {0} to {1}". + format(prev_start, prev_end)) + + audit_period_beginning = str(prev_start) + audit_period_ending = str(prev_end) + for lb in stats: + lbid, byte_count = lb + + if lbid not in loadbalancers: + LOG.error("Loadbalancer {0} not found in DB " + "not sending usage statistics".format(lbid)) + continue + + # Build the payload + payload = _build_payload(audit_period_beginning, + audit_period_ending, + loadbalancers[lbid]["name"], + lbid, + loadbalancers[lbid]["tenant_id"], + loadbalancers[lbid]["status"]) + + payload["metrics"] = _build_metrics(byte_count) + + LOG.info( + "Sending MnB {0} notification to MnB for " + "loadbalancer {1} tenant_id {2} from " + "{3} to {4}: PAYLOAD = {5}". + format(event_type, + lbid, + loadbalancers[lbid]["tenant_id"], + prev_start, + prev_end, + payload) + ) + _notify('lbaas', event_type, payload) + count += 1 + + # Purge old stats + if cfg.CONF['admin_api'].stats_purge_enable: + hours = cfg.CONF['admin_api'].stats_purge_days * 24 + delta = datetime.timedelta(hours=hours) + exp = timeutils.utcnow() - delta + exp_time = exp.strftime('%Y-%m-%d %H:%M:%S') + purged = session.query(Stats).\ + filter(Stats.period_end < exp_time).\ + delete() + LOG.info("Purged {0} usage statistics from before {1}". + format(purged, exp_time)) + + session.commit() + LOG.info("Sent {0} MnB {1} notifications to MnB".format(count, event_type)) + + +def _send_test(event_type, lbid, tenant_id): + + # Build the payload + now = str(timeutils.utcnow()) + LOG.error("Sending {0} test notifications".format(lbid)) + + if not test_mnb_connection(): + # Abort the test notifications + LOG.info("Aborting test Notifications. Could not connect") + return + + #Note lbid is the number of notifications to send + lbid += 1 + for x in xrange(1, lbid): + payload = _build_payload(now, now, "Test LB", str(x), + str(tenant_id), 'active') + _notify('lbaas', 'lbaas.instance.test', payload) + + +def _build_payload(begin, end, name, id, tenant, status): + return { + "audit_period_beginning": begin, + "audit_period_ending": end, + "display_name": name, + "id": id, + "type": "lbaas.std", + "type_id": 1, + "tenant_id": tenant, + "state": status.lower(), + "state_description": status.lower() + } + + +def _build_metrics(bytes): + return { + "metric_name": "lbaas.network.outgoing.bytes", + "metric_type": "gauge", + "metric_units": "BYTES", + "metric_value": bytes + } + + +def _rounded_down_min(ts, N): + ts = ts - datetime.timedelta(minutes=ts.minute % N, + seconds=ts.second, + microseconds=ts.microsecond) + return ts + + +def _get_lbs(): + all_lbs = {} + with db_session() as session: + lbs = session.query( + LoadBalancer.id, + LoadBalancer.tenantid, + LoadBalancer.name, + LoadBalancer.status, + ).all() + + for lb in lbs: + all_lbs[lb.id] = { + "tenant_id": lb.tenantid, + "name": lb.name, + "status": lb.status + } + session.commit() + return all_lbs diff --git a/requirements.txt b/requirements.txt index 0f82c313..0f04ef20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ wsme>=0.5b2 mysql-connector-python ipaddress==1.0.4 six<1.4.0 +kombu