[ADMIN_API] API side of Mnb Changes

These changes consist of most of the API side of the MnB work that
is needed. This is still Work In Progress.  It does not contain the
MnB code for sending usage notifications.

Change-Id: I25ca671f65272709480f9788fa6d671eabe06046
This commit is contained in:
Marc Pilon
2013-11-14 16:50:04 -05:00
parent 615baa7af6
commit dd086ee5f9
17 changed files with 1248 additions and 145 deletions

54
bin/mnbtest.py Executable file
View File

@@ -0,0 +1,54 @@
#!/usr/bin/env python
##############################################################################
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
##############################################################################
import logging as std_logging
import time
from oslo.config import cfg
from libra.openstack.common import log as logging
from libra.common.api.mnb import update_mnb
from libra import __version__
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
CONF.register_opts([
cfg.IntOpt('testcount',
metavar='COUNT',
default=1,
help='Number of messages to send')
])
def main():
CONF(project='mnbtest', version=__version__)
logging.setup('mnbtest')
LOG.debug('Configuration:')
print "Starting Test"
print "LOG FILE = {0}".format(CONF.log_file)
LOG.info('STARTING MNBTEST')
CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info("Calling update_mnb with {0} messages".format(CONF.testcount))
update_mnb('lbaas.instance.test', CONF.testcount, 456)
time.sleep(30)
if __name__ == "__main__":
main()

View File

@@ -24,6 +24,28 @@
#syslog_faciltiy = local7
#logstash = HOST:PORT
# Openstack
#notification_driver = openstack.common.notifier.rpc_notifier
#default_notification_level = INFO
#default_publisher_id = id
#host = localhost
# Kombu
rabbit_use_ssl = False
#kombu_ssl_version = ''
#kombu_ssl_keyfile = ''
#kombu_ssl_certfile = ''
#kombu_ssl_ca_certs = ''
#rabbit_host = localhost
#rabbit_port = 5672
#rabbit_userid = user
#rabbit_password = password
#rabbit_hosts =
#rabbit_virtual_host = vhost
#rabbit_retry_interval = 1
#rabbit_retry_backoff = 2
#rabbit_max_retries = 0
#rabbit_ha_queues = False
#-----------------------------------------------------------------------
# Options for utilities that are Gearman workers or clients.
@@ -110,6 +132,10 @@ nova_tenant_id = TENANTID
#stats_poll_timeout = 5
#stats_poll_timeout_retry = 30
#vip_pool_size = 10
#billing_enable = False
#exists_freq = 60
#usage_freq = 60
#stats_freq = 5
# Required options
db_sections = mysql1
@@ -125,7 +151,6 @@ datadog_tags = service:lbaas
# Others
#-----------------------------------------------------------------------
# The [api] section is specific to the libra_api utility.
#-----------------------------------------------------------------------
@@ -149,7 +174,6 @@ ssl_certfile = certfile.crt
ssl_keyfile = keyfile.key
ip_filters = 192.168.0.0/24
#-----------------------------------------------------------------------
# The [mysql*] sections are referenced by admin_api and api by the
# db_sections values.

39
etc/mnb.cfg Normal file
View File

@@ -0,0 +1,39 @@
########################################################################
# Config for oslo notifier
########################################################################
[DEFAULT]
# Options to enable more verbose output
verbose = true
debug = true
use_stderr = true
publish_errors = true
logfile = /tmp/libra.log
# Openstack
notification_driver = drivername
default_notification_level = INFO
default_publisher_id = lbaas
host = apiTest
# Kombu
rabbit_use_ssl = True
rabbit_host = localhost
rabbit_port = 5671
rabbit_userid = user
rabbit_password = password
#rabbit_hosts =
rabbit_virtual_host = vhost
rabbit_retry_interval = 1
rabbit_retry_backoff = 2
rabbit_max_retries = 0
rabbit_ha_queues = False
fake_rabbit = False
control_exchange = exchange
amqp_durable_queues = True
[admin_api]
billing_enable = True
exists_freq = 20
logfile = /tmp/libra_admin.log
db_sections = ''

View File

@@ -86,6 +86,55 @@ cfg.CONF.register_opts(
cfg.IntOpt('vip_pool_size',
default=10,
help='Number of hot spare vips to keep in the pool'),
cfg.BoolOpt('billing_enable',
default=False,
help='Enable / Disable billing notifications'),
cfg.BoolOpt('stats_enable',
default=False,
help='Enable / Disable usage statistics gathering'),
cfg.IntOpt('exists_freq',
metavar='MINUTES',
default=60,
help='Minutes between sending of billing exists messages'),
cfg.IntOpt('usage_freq',
metavar='MINUTES',
default=60,
help='Minutes between sending of billing usage messages'),
cfg.IntOpt('stats_freq',
metavar='MINUTES',
default=5,
help='Minutes between sending STATS requests to workers'),
cfg.BoolOpt('stats_purge_enable',
default=False,
help='Enable / Disable purging of usage statistics'),
cfg.IntOpt('stats_purge_days',
metavar='DAYS',
default=5,
help='Number of days to keep usage STATS before purging'),
cfg.IntOpt('delete_timer_seconds',
default=5,
help='Which second of each minute delete timer should run'),
cfg.IntOpt('ping_timer_seconds',
default=15,
help='Second of each minute ping timer should run'),
cfg.IntOpt('stats_timer_seconds',
default=20,
help='Second of each minute stats timer should run'),
cfg.IntOpt('usage_timer_seconds',
default=25,
help='Which second of each minute usage timer should run'),
cfg.IntOpt('probe_timer_seconds',
default=30,
help='Which second of each minute probe timer should run'),
cfg.IntOpt('offline_timer_seconds',
default=45,
help='Second of each minute offline timer should run'),
cfg.IntOpt('vips_timer_seconds',
default=50,
help='Which second of each minute vips timer should run'),
cfg.IntOpt('exists_timer_seconds',
default=55,
help='Second of each minute exists timer should run'),
],
group=adminapi_group
)

View File

@@ -29,7 +29,10 @@ from eventlet import wsgi
from libra import __version__
from libra.common.api import server
from libra.admin_api.stats.drivers.base import known_drivers
from libra.admin_api.stats.scheduler import Stats
from libra.admin_api.stats.ping_sched import PingStats
from libra.admin_api.stats.offline_sched import OfflineStats
from libra.admin_api.stats.billing_sched import BillingStats
from libra.admin_api.stats.stats_sched import UsageStats
from libra.admin_api.device_pool.manage_pool import Pool
from libra.admin_api.expunge.expunge import ExpungeScheduler
from libra.admin_api import config as api_config
@@ -97,13 +100,27 @@ class MaintThreads(object):
self.run_threads()
def run_threads(self):
stats = Stats(self.drivers)
pool = Pool()
expunge = ExpungeScheduler()
self.classes.append(stats)
self.classes.append(pool)
expunge = ExpungeScheduler()
self.classes.append(expunge)
pings = PingStats(self.drivers)
self.classes.append(pings)
offline = OfflineStats(self.drivers)
self.classes.append(offline)
if CONF['admin_api'].stats_enable:
usage = UsageStats(self.drivers)
self.classes.append(usage)
if CONF['admin_api'].billing_enable:
billing = BillingStats(self.drivers)
self.classes.append(billing)
def exit_handler(self, signum, frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)

View File

@@ -31,9 +31,9 @@ LOG = log.getLogger(__name__)
class Pool(object):
DELETE_SECONDS = 05
PROBE_SECONDS = 30
VIPS_SECONDS = 50
DELETE_SECONDS = cfg.CONF['admin_api'].delete_timer_seconds
PROBE_SECONDS = cfg.CONF['admin_api'].probe_timer_seconds
VIPS_SECONDS = cfg.CONF['admin_api'].vips_timer_seconds
def __init__(self):
self.probe_timer = None

View File

@@ -0,0 +1,192 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import datetime
from oslo.config import cfg
from libra.common.api.lbaas import Billing, db_session
from libra.common.api.mnb import update_mnb, test_mnb_connection
from libra.openstack.common import timeutils
from libra.openstack.common import log as logging
from sqlalchemy.sql import func
LOG = logging.getLogger(__name__)
class BillingStats(object):
EXISTS_SECONDS = cfg.CONF['admin_api'].exists_timer_seconds
USAGE_SECONDS = cfg.CONF['admin_api'].usage_timer_seconds
def __init__(self, drivers):
self.drivers = drivers
self.usage_timer = None
self.exists_timer = None
self.server_id = cfg.CONF['admin_api']['server_id']
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
self.exists_freq = cfg.CONF['admin_api'].exists_freq
self.usage_freq = cfg.CONF['admin_api'].usage_freq
self.start_usage_sched()
self.start_exists_sched()
def shutdown(self):
if self.usage_timer:
self.usage_timer.cancel()
if self.exists_timer:
self.exists_timer.cancel()
def update_usage(self):
# Work out if it is our turn to run
minute = datetime.datetime.now().minute
if self.server_id != minute % self.number_of_servers:
self.start_usage_sched()
return
# Send periodic usage notifications
try:
self._exec_usage()
except Exception:
LOG.exception('Uncaught exception during billing usage update')
# Need to restart timer after every billing cycle
self.start_usage_sched()
def update_exists(self):
# Work out if it is our turn to run
minute = datetime.datetime.now().minute
if self.server_id != minute % self.number_of_servers:
self.start_exists_sched()
return
# Send periodic exists notifications
try:
self._exec_exists()
except Exception:
LOG.exception('Uncaught exception during billing exists update')
# Need to restart timer after every billing cycle
self.start_exists_sched()
def _exec_exists(self):
with db_session() as session:
# Check if it's time to send exists notifications
delta = datetime.timedelta(minutes=self.exists_freq)
exp = timeutils.utcnow() - delta
exp_time = exp.strftime('%Y-%m-%d %H:%M:%S')
updated = session.query(
Billing.last_update
).filter(Billing.name == "exists").\
filter(Billing.last_update > exp_time).\
first()
if updated is not None:
# Not time yet
LOG.info('Not time to send exists notifications yet {0}'.
format(exp_time))
session.rollback()
return
# Check the connection before sending the notifications
if not test_mnb_connection():
# Abort the exists notifications
LOG.info("Aborting exists notifications. Could not connect")
session.rollback()
return
#Update the exists timestamp now
session.query(Billing).\
filter(Billing.name == "exists").\
update({"last_update": func.now()},
synchronize_session='fetch')
session.commit()
# Send the notifications
update_mnb('lbaas.instance.exists', None, None)
def _exec_usage(self):
with db_session() as session:
# Next check if it's time to send bandwidth usage notifications
delta = datetime.timedelta(minutes=self.usage_freq)
exp = timeutils.utcnow() - delta
start, = session.query(
Billing.last_update
).filter(Billing.name == "usage").\
first()
if start and start > exp:
# Not time yet
LOG.info('Not time to send usage statistics yet {0}'.
format(exp))
session.rollback()
return
# Check the connection before sending the notifications
if not test_mnb_connection():
# Abort the exists notifications
LOG.info("Aborting usage notifications. Could not connect")
session.rollback()
return
# Calculate the stopping point by rounding backward to the nearest
# N minutes. i.e. if N = 60, this will round us back to HH:00:00,
# or if N = 15, it will round us back to HH:15:00, HH:30:00,
# HH:45:00, or HH:00:00, whichever is closest.
N = cfg.CONF['admin_api'].usage_freq
now = timeutils.utcnow()
stop = now - datetime.timedelta(minutes=now.minute % N,
seconds=now.second,
microseconds=now.microsecond)
# Release the lock
session.query(Billing).\
filter(Billing.name == "usage").\
update({"last_update": stop},
synchronize_session='fetch')
session.commit()
# Send the usage notifications. Pass the timestamps to save
# queries.
update_mnb('lbaas.bandwidth.usage', start, stop)
def start_usage_sched(self):
# Always try to hit the expected second mark for usage
seconds = datetime.datetime.now().second
if seconds < self.USAGE_SECONDS:
sleeptime = self.USAGE_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.USAGE_SECONDS)
LOG.info('LB usage timer sleeping for {secs} seconds'
.format(secs=sleeptime))
self.usage_timer =\
threading.Timer(sleeptime, self.update_usage, ())
self.usage_timer.start()
def start_exists_sched(self):
# Always try to hit the expected second mark for exists
seconds = datetime.datetime.now().second
if seconds < self.EXISTS_SECONDS:
sleeptime = self.EXISTS_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.EXISTS_SECONDS)
LOG.info('LB exists timer sleeping for {secs} seconds'
.format(secs=sleeptime))
self.exists_timer =\
threading.Timer(sleeptime, self.update_exists, ())
self.exists_timer.start()

View File

@@ -0,0 +1,161 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from datetime import datetime
from oslo.config import cfg
from libra.common.api.lbaas import Device, db_session
from libra.admin_api.stats.stats_gearman import GearJobs
from libra.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class OfflineStats(object):
OFFLINE_SECONDS = cfg.CONF['admin_api'].offline_timer_seconds
def __init__(self, drivers):
self.drivers = drivers
self.offline_timer = None
self.ping_limit = cfg.CONF['admin_api']['stats_offline_ping_limit']
self.error_limit = cfg.CONF['admin_api']['stats_device_error_limit']
self.server_id = cfg.CONF['admin_api']['server_id']
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
self.start_offline_sched()
def shutdown(self):
if self.offline_timer:
self.offline_timer.cancel()
def check_offline_lbs(self):
# Work out if it is our turn to run
minute = datetime.now().minute
if self.server_id != minute % self.number_of_servers:
LOG.info('Not our turn to run OFFLINE check, sleeping')
self.start_offline_sched()
return
tested = 0
failed = 0
try:
tested, failed = self._exec_offline_check()
except Exception:
LOG.exception('Uncaught exception during OFFLINE check')
# Need to restart timer after every ping cycle
LOG.info(
'{tested} OFFLINE loadbalancers tested, {failed} failed'
.format(tested=tested, failed=failed)
)
self.start_offline_sched()
def _exec_offline_check(self):
tested = 0
failed = 0
node_list = []
LOG.info('Running OFFLINE check')
with db_session() as session:
# Join to ensure device is in-use
devices = session.query(
Device.id, Device.name
).filter(Device.status == 'OFFLINE').all()
tested = len(devices)
if tested == 0:
LOG.info('No OFFLINE Load Balancers to check')
return (0, 0)
for lb in devices:
node_list.append(lb.name)
gearman = GearJobs()
failed_lbs = gearman.offline_check(node_list)
failed = len(failed_lbs)
if failed > self.error_limit:
LOG.error(
'Too many simultaneous Load Balancer Failures.'
' Aborting deletion attempt'
)
return tested, failed
if failed > 0:
self._send_delete(failed_lbs)
# Clear the ping counts for all devices not in
# the failed list
succeeded = list(set(node_list) - set(failed_lbs))
session.query(Device.name, Device.pingCount).\
filter(Device.name.in_(succeeded)).\
update({"pingCount": 0}, synchronize_session='fetch')
session.commit()
return tested, failed
def _send_delete(self, failed_nodes):
with db_session() as session:
for lb in failed_nodes:
# Get the current ping count
data = session.query(
Device.id, Device.pingCount).\
filter(Device.name == lb).first()
if not data:
LOG.error(
'Device {0} no longer exists'.format(data.id)
)
continue
if data.pingCount < self.ping_limit:
data.pingCount += 1
LOG.error(
'Offline Device {0} has failed {1} ping attempts'.
format(lb, data.pingCount)
)
session.query(Device).\
filter(Device.name == lb).\
update({"pingCount": data.pingCount},
synchronize_session='fetch')
session.flush()
continue
message = (
'Load balancer {0} unreachable and marked for deletion'.
format(lb)
)
for driver in self.drivers:
instance = driver()
LOG.info(
'Sending delete request for {0} to {1}'.format(
lb, instance.__class__.__name__
)
)
instance.send_delete(message, data.id)
session.commit()
def start_offline_sched(self):
# Always try to hit the expected second mark for offline checks
seconds = datetime.now().second
if seconds < self.OFFLINE_SECONDS:
sleeptime = self.OFFLINE_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.OFFLINE_SECONDS)
LOG.info('LB offline check timer sleeping for {secs} seconds'
.format(secs=sleeptime))
self.offline_timer = threading.Timer(
sleeptime, self.check_offline_lbs, ()
)
self.offline_timer.start()

View File

@@ -16,29 +16,20 @@ import threading
from datetime import datetime
from oslo.config import cfg
from libra.common.api.lbaas import LoadBalancer, Device, Node, db_session
from libra.openstack.common import log
from libra.openstack.common import log as logging
from libra.admin_api.stats.stats_gearman import GearJobs
LOG = log.getLogger(__name__)
LOG = logging.getLogger(__name__)
class NodeNotFound(Exception):
pass
class PingStats(object):
class Stats(object):
PING_SECONDS = 15
OFFLINE_SECONDS = 45
PING_SECONDS = cfg.CONF['admin_api'].ping_timer_seconds
def __init__(self, drivers):
self.drivers = drivers
self.ping_timer = None
self.offline_timer = None
self.ping_limit = cfg.CONF['admin_api']['stats_offline_ping_limit']
self.error_limit = cfg.CONF['admin_api']['stats_device_error_limit']
self.server_id = cfg.CONF['admin_api']['server_id']
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
@@ -46,33 +37,10 @@ class Stats(object):
LOG.info("Selected stats drivers: %s", self.stats_driver)
self.start_ping_sched()
self.start_offline_sched()
def shutdown(self):
if self.ping_timer:
self.ping_timer.cancel()
if self.offline_timer:
self.offline_timer.cancel()
def check_offline_lbs(self):
# Work out if it is our turn to run
minute = datetime.now().minute
if self.server_id != minute % self.number_of_servers:
LOG.info('Not our turn to run OFFLINE check, sleeping')
self.start_offline_sched()
return
tested = 0
failed = 0
try:
tested, failed = self._exec_offline_check()
except Exception:
LOG.exception('Uncaught exception during OFFLINE check')
# Need to restart timer after every ping cycle
LOG.info(
'{tested} OFFLINE loadbalancers tested, {failed} failed'
.format(tested=tested, failed=failed)
)
self.start_offline_sched()
def ping_lbs(self):
# Work out if it is our turn to run
@@ -126,47 +94,6 @@ class Stats(object):
return pings, failed
def _exec_offline_check(self):
tested = 0
failed = 0
node_list = []
LOG.info('Running OFFLINE check')
with db_session() as session:
# Join to ensure device is in-use
devices = session.query(
Device.id, Device.name
).filter(Device.status == 'OFFLINE').all()
tested = len(devices)
if tested == 0:
LOG.info('No OFFLINE Load Balancers to check')
return (0, 0)
for lb in devices:
node_list.append(lb.name)
gearman = GearJobs()
failed_lbs = gearman.offline_check(node_list)
failed = len(failed_lbs)
if failed > self.error_limit:
LOG.error(
'Too many simultaneous Load Balancer Failures.'
' Aborting deletion attempt'
)
return tested, failed
if failed > 0:
self._send_delete(failed_lbs)
# Clear the ping counts for all devices not in
# the failed list
succeeded = list(set(node_list) - set(failed_lbs))
session.query(Device.name, Device.pingCount).\
filter(Device.name.in_(succeeded)).\
update({"pingCount": 0}, synchronize_session='fetch')
session.commit()
return tested, failed
def _send_fails(self, failed_lbs):
with db_session() as session:
for lb in failed_lbs:
@@ -196,47 +123,6 @@ class Stats(object):
instance.send_alert(message, data.id)
session.commit()
def _send_delete(self, failed_nodes):
with db_session() as session:
for lb in failed_nodes:
# Get the current ping count
data = session.query(
Device.id, Device.pingCount).\
filter(Device.name == lb).first()
if not data:
LOG.error(
'Device {0} no longer exists'.format(data.id)
)
continue
if data.pingCount < self.ping_limit:
data.pingCount += 1
LOG.error(
'Offline Device {0} has failed {1} ping attempts'.
format(lb, data.pingCount)
)
session.query(Device).\
filter(Device.name == lb).\
update({"pingCount": data.pingCount},
synchronize_session='fetch')
session.flush()
continue
message = (
'Load balancer {0} unreachable and marked for deletion'.
format(lb)
)
for driver in self.drivers:
instance = driver()
LOG.info(
'Sending delete request for {0} to {1}'.format(
lb, instance.__class__.__name__
)
)
instance.send_delete(message, data.id)
session.commit()
def _get_lb(self, lb, session):
lb = session.query(
LoadBalancer.tenantid, Device.floatingIpAddr, Device.id
@@ -358,17 +244,3 @@ class Stats(object):
LOG.info('LB ping check timer sleeping for %d seconds', sleeptime)
self.ping_timer = threading.Timer(sleeptime, self.ping_lbs, ())
self.ping_timer.start()
def start_offline_sched(self):
# Always try to hit the expected second mark for offline checks
seconds = datetime.now().second
if seconds < self.OFFLINE_SECONDS:
sleeptime = self.OFFLINE_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.OFFLINE_SECONDS)
LOG.info('LB offline check timer sleeping for %d seconds', sleeptime)
self.offline_timer = threading.Timer(
sleeptime, self.check_offline_lbs, ()
)
self.offline_timer.start()

View File

@@ -47,7 +47,7 @@ class GearJobs(object):
failed_list = []
node_status = dict()
retry_list = []
job_data = {"hpcs_action": "STATS"}
job_data = {"hpcs_action": "PING"}
for node in node_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_pings = self.gm_client.submit_multiple_jobs(
@@ -143,3 +143,62 @@ class GearJobs(object):
if gearman_fail > max_fail_count:
failed_list.append(ping.job.task)
return failed_list
def get_stats(self, node_list):
# TODO: lots of duplicated code that needs cleanup
list_of_jobs = []
failed_list = []
retry_list = []
results = {}
job_data = {"hpcs_action": "STATS"}
for node in node_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_stats = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_timeout
)
for stats in submitted_stats:
if stats.state == JOB_UNKNOWN:
# TODO: Gearman server failed, ignoring for now
retry_list.append(stats.job.task)
if stats.timed_out:
# Timeout
retry_list.append(stats.job.task)
if stats.result['hpcs_response'] == 'FAIL':
# Error returned by Gearman
failed_list.append(stats.job.task)
else:
#Success
results[stats.job.task] = stats.result
list_of_jobs = []
if len(retry_list) > 0:
LOG.info(
"{0} stats timed out, retrying".format(len(retry_list))
)
for node in retry_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_stats = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_retry
)
for stats in submitted_stats:
if stats.state == JOB_UNKNOWN:
# TODO: Gearman server failed, ignoring for now
LOG.error(
"Gearman Job server failed during STATS check of {0}".
format(stats.job.task)
)
failed_list.append(stats.job.task)
if stats.timed_out:
# Timeout
failed_list.append(stats.job.task)
if stats.result['hpcs_response'] == 'FAIL':
# Error returned by Gearman
failed_list.append(stats.job.task)
continue
else:
#Success
results[stats.job.task] = stats.result
return failed_list, results

View File

@@ -0,0 +1,217 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import datetime
from oslo.config import cfg
from libra.common.api.lbaas import LoadBalancer, Device, db_session
from libra.common.api.lbaas import Billing, Stats
from libra.admin_api.stats.stats_gearman import GearJobs
from libra.openstack.common import timeutils
from libra.openstack.common import log as logging
from sqlalchemy.sql import func
LOG = logging.getLogger(__name__)
class UsageStats(object):
STATS_SECONDS = cfg.CONF['admin_api'].stats_timer_seconds
def __init__(self, drivers):
self.drivers = drivers
self.stats_timer = None
self.server_id = cfg.CONF['admin_api']['server_id']
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
self.stats_freq = cfg.CONF['admin_api'].stats_freq
self.billing_enable = cfg.CONF['admin_api'].billing_enable
self.start_stats_sched()
def shutdown(self):
if self.billing_timer:
self.billing_timer.cancel()
def gather_stats(self):
# Work out if it is our turn to run
minute = datetime.datetime.now().minute
if self.server_id != minute % self.number_of_servers:
self.start_stats_sched()
return
total = 0
fail = 0
try:
fail, total = self._exec_stats()
except Exception:
LOG.exception('Uncaught exception during stats collection')
# Need to restart timer after every stats cycle
LOG.info('{total} lb device stats queried, {fail} failed'
.format(total=total, fail=fail))
self.start_stats_sched()
def _exec_stats(self):
failed = 0
node_list = []
with db_session() as session:
delta = datetime.timedelta(minutes=self.stats_freq)
exp = timeutils.utcnow() - delta
exp_time = exp.strftime('%Y-%m-%d %H:%M:%S')
updated = session.query(
Billing.last_update
).filter(Billing.name == "stats").\
filter(Billing.last_update > exp_time).\
first()
if updated is not None:
# Not time yet
LOG.info('Not time to gather stats yet {0}'.format(exp_time))
session.rollback()
return 0, 0
#Update the stats timestamp
session.query(Billing).\
filter(Billing.name == "stats").\
update({"last_update": func.now()},
synchronize_session='fetch')
# Get all the online devices to query for stats
devices = session.query(
Device.id, Device.name
).filter(Device.status == 'ONLINE').all()
if devices is None or len(devices) == 0:
LOG.error('No ONLINE devices to gather usage stats from')
session.rollback()
return 0, 0
total = len(devices)
for device in devices:
node_list.append(device.name)
gearman = GearJobs()
failed_list, results = gearman.get_stats(node_list)
failed = len(failed_list)
if failed > 0:
self._send_fails(failed_list)
if total > failed:
# We have some success
self._update_stats(results, failed_list)
session.commit()
else:
# Everything failed. Retry these on the next timer firing
session.rollback()
return failed, total
def _update_stats(self, results, failed_list):
with db_session() as session:
lbs = session.query(
LoadBalancer.id,
LoadBalancer.protocol,
LoadBalancer.status,
Device.name
).join(LoadBalancer.devices).\
filter(Device.status == 'ONLINE').all()
if lbs is None:
session.rollback()
LOG.error('No Loadbalancers found when updating stats')
return
total = len(lbs)
added = 0
for lb in lbs:
if lb.name not in results:
if lb.name not in failed_list:
LOG.error(
'No stats results found for Device {0}, LBID {1}'
.format(lb.name, lb.id))
continue
result = results[lb.name]
protocol = lb.protocol.lower()
if protocol != "http":
# GALERA or TCP = TCP at the worker
protocol = "tcp"
bytes_out = -1
for data in result["loadBalancers"]:
if data["protocol"] == protocol:
bytes_out = data["bytes_out"]
if bytes_out == -1:
LOG.error(
'No stats found for Device {0}, '
'LBID {1}, protocol {2}'
.format(lb.name, lb.id, protocol))
continue
new_entry = Stats()
new_entry.lbid = lb.id
new_entry.period_start = result["utc_start"]
new_entry.period_end = result["utc_end"]
new_entry.bytes_out = bytes_out
new_entry.status = lb.status
session.add(new_entry)
session.flush
added += 1
session.commit()
LOG.info(
'{total} loadbalancers stats queried, {fail} failed'
.format(total=total, fail=total - added))
def _send_fails(self, failed_list):
with db_session() as session:
for device_name in failed_list:
data = self._get_lb(device_name, session)
if not data:
LOG.error(
'Device {0} has no Loadbalancer attached during STATS'.
format(device_name)
)
continue
LOG.error(
'Load balancer failed STATS request '
'ID: {0}\n'
'IP: {1}\n'
'tenant: {2}\n'.format(
data.id, data.floatingIpAddr,
data.tenantid))
def _get_lb(self, lb, session):
lb = session.query(
LoadBalancer.tenantid, Device.floatingIpAddr, Device.id
).join(LoadBalancer.devices).\
filter(Device.name == lb).first()
return lb
def start_stats_sched(self):
# Always try to hit the expected second mark for stats
seconds = datetime.datetime.now().second
if seconds < self.STATS_SECONDS:
sleeptime = self.STATS_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.STATS_SECONDS)
LOG.info('LB stats timer sleeping for {secs} seconds'
.format(secs=sleeptime))
self.stats_timer = threading.Timer(sleeptime, self.gather_stats, ())
self.stats_timer.start()

View File

@@ -33,6 +33,7 @@ from libra.common.exc import ExhaustedError
from libra.api.model.validators import LBPut, LBPost, LBResp, LBVipResp
from libra.api.model.validators import LBRespNode
from libra.common.api.gearman_client import submit_job
from libra.common.api.mnb import update_mnb
from libra.api.acl import get_limited_to_project
from libra.api.library.exp import OverLimit, IPOutOfRange, NotFound
from libra.api.library.exp import ImmutableEntity, ImmutableStates
@@ -469,6 +470,9 @@ class LoadBalancersController(RestController):
'UPDATE', device.name, device.id, lb.id
)
#Notify billing of the LB creation
update_mnb('lbaas.instance.create', lb.id, tenant_id)
return return_data
@wsme_pecan.wsexpose(None, body=LBPut, status_code=202)
@@ -579,6 +583,9 @@ class LoadBalancersController(RestController):
submit_job(
'DELETE', device.name, device.id, lb.id
)
#Notify billing of the LB deletion
update_mnb('lbaas.instance.delete', lb.id, tenant_id)
return None
def usage(self, load_balancer_id):

View File

@@ -33,7 +33,8 @@ gearman_workers = [
'DELETE', # Delete a Load Balancer.
'DISCOVER', # Return service discovery information.
'ARCHIVE', # Archive LB log files.
'STATS' # Get load balancer statistics.
'STATS', # Get load balancer statistics.
'PING' # Ping load balancers
]

View File

@@ -20,7 +20,7 @@ import time
from oslo.config import cfg
from pecan import conf
from sqlalchemy import Table, Column, Integer, ForeignKey, create_engine
from sqlalchemy import INTEGER, VARCHAR, BIGINT
from sqlalchemy import INTEGER, VARCHAR, BIGINT, DATETIME
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, backref, sessionmaker, Session
@@ -151,6 +151,28 @@ class HealthMonitor(DeclarativeBase):
path = Column(u'path', VARCHAR(length=2000))
class Billing(DeclarativeBase):
__tablename__ = 'billing'
id = Column(u'id', Integer, primary_key=True, nullable=False)
name = Column(u'name', VARCHAR(length=128), nullable=False)
last_update = Column(u'last_update', DATETIME(), nullable=False)
class Stats(DeclarativeBase):
"""stats model"""
__tablename__ = 'stats'
#column definitions
id = Column(u'id', BIGINT(), primary_key=True, nullable=False)
lbid = Column(
u'lbid', BIGINT(), ForeignKey('loadbalancers.id'), primary_key=True,
nullable=False
)
period_start = Column(u'period_start', DATETIME(), nullable=False)
period_end = Column(u'period_end', DATETIME(), nullable=False)
bytes_out = Column(u'bytes_out', BIGINT(), nullable=False)
status = Column(u'status', VARCHAR(length=50), nullable=False)
class RoutingSession(Session):
""" Try to use the first engine provided. If this fails use the next in
sequence and so on. Reset to the first after 60 seconds

View File

@@ -72,7 +72,7 @@ CREATE TABLE monitors (
timeout INT NOT NULL, # Maximum number of seconds to wait for a connection to the node before it times out.
attemptsBeforeDeactivation INT NOT NULL, # Number of permissible failures before removing a node from rotation. 1 to 10.
path VARCHAR(2000) NULL, # The HTTP path used in the request by the monitor. Begins with /
PRIMARY KEY (lbid) # ids are unique accross all Nodes
PRIMARY KEY (lbid) # ids are unique across all Nodes
) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci;
CREATE TABLE `pool_building` (
@@ -100,3 +100,24 @@ CREATE TABLE `global_limits` (
INSERT INTO `global_limits` VALUES (1,'maxLoadBalancerNameLength',128),(2,'maxVIPsPerLoadBalancer',1),(3,'maxNodesPerLoadBalancer',50),(4,'maxLoadBalancers',20);
# Billing
CREATE TABLE billing (
id int(11) NOT NULL,
name varchar(128) NOT NULL,
last_update DATETIME NOT NULL DEFAULT '0000-00-00 00:00:00', # timestamp of when the feature was last updated
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET latin1;
INSERT INTO billing VALUES (1, 'stats', '0000-00-00 00:00:00'),(2, 'usage', '0000-00-00 00:00:00'),(3, 'exists', '0000-00-00 00:00:00');
# Stats
CREATE TABLE stats (
id BIGINT NOT NULL AUTO_INCREMENT, # unique id for this billing record
lbid BIGINT NOT NULL REFERENCES loadblancers(id), # fk for lbid
period_start DATETIME NOT NULL, # timestamp of when this period started
period_end DATETIME NOT NULL, # timestamp of when this period ended
bytes_out BIGINT NOT NULL, # bytes transferred in this period
status VARCHAR(50) NOT NULL, # Current LB status
PRIMARY KEY (id) # ids are unique across all LBs
) ENGINE=InnoDB DEFAULT CHARSET latin1;

367
libra/common/api/mnb.py Normal file
View File

@@ -0,0 +1,367 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import eventlet
eventlet.monkey_patch()
from oslo.config import cfg
from libra.common.api.lbaas import LoadBalancer, db_session
from libra.common.api.lbaas import Stats
from libra.openstack.common.notifier import api as notifier_api
from libra.openstack.common import timeutils
from libra.openstack.common import log as logging
from libra.openstack.common import rpc
from libra.openstack.common.rpc import common as rpc_common
from sqlalchemy.sql import func
LOG = logging.getLogger(__name__)
def update_mnb(event_type, lbid, tenant_id):
# Start a new thread
eventlet.spawn_n(client_job, event_type, lbid, tenant_id)
def client_job(event_type, lbid, tenant_id):
try:
if(event_type == 'lbaas.instance.create' or
event_type == 'lbaas.instance.delete'):
_send_create_or_delete(event_type, lbid, tenant_id)
elif event_type == 'lbaas.instance.exists':
_send_exists(event_type)
elif event_type == 'lbaas.bandwidth.usage':
_send_usage(event_type, lbid, tenant_id)
elif event_type == 'lbaas.instance.test':
_send_test(event_type, lbid, tenant_id)
return
except:
LOG.exception("MnB notify: unhandled exception")
LOG.error("MnB notification unsuccessful. Type {0}, loadbalancer {1} "
"tenant_id {2}".format(event_type, lbid, tenant_id))
def _notify(service, event_type, payload):
priority = cfg.CONF.default_notification_level
publisher_id = notifier_api.publisher_id(service)
notifier_api.notify(None, publisher_id, event_type, priority, payload)
def test_mnb_connection():
# Because the oslo notifier code does not have a return status
# and exceptions are caught inside oslo (I know...), the best we
# can do here is use the oslo rpc code to try a test connection
# to the MnB servers before the notification(s) are sent.
connected = False
try:
cx = rpc.create_connection()
cx.close()
LOG.info("Verified RPC connection is ready")
connected = True
except rpc_common.RPCException as e:
LOG.error("RPC connect exception: %s", e)
except Exception as e:
LOG.error("Non-RPC connect exception: %s", e)
return connected
def _send_create_or_delete(event_type, lbid, tenant_id):
LOG.info(
"Sending MnB {0} notification to MnB for "
"loadbalancer {1} tenant_id {2}".format(
event_type, lbid, tenant_id)
)
if not test_mnb_connection():
# Abort the notification
if event_type == 'lbaas.instance.create':
LOG.info("Aborting Create Notifications. Could not connect")
else:
LOG.info("Aborting Delete Notifications. Could not connect")
return
with db_session() as session:
lb = session.query(
LoadBalancer.name,
LoadBalancer.status,
LoadBalancer.created,
LoadBalancer.updated
).filter(LoadBalancer.id == lbid).\
filter(LoadBalancer.tenantid == tenant_id).first()
if lb is None:
session.rollback()
LOG.error("Load Balancer {0} not found for tenant {1}".format(
lbid, tenant_id))
return
if event_type == 'lbaas.instance.create':
date = lb.created
else:
date = lb.updated
# Build the payload
payload = _build_payload(date, date, lb.name, lbid,
tenant_id, lb.status)
_notify('lbaas', event_type, payload)
session.commit()
def _send_exists(event_type):
LOG.info("Sending MnB {0} notifications to MnB".format(event_type))
count = 0
with db_session() as session:
lbs = session.query(
LoadBalancer.id,
LoadBalancer.tenantid,
LoadBalancer.name,
LoadBalancer.status,
LoadBalancer.created,
LoadBalancer.updated
).filter(LoadBalancer.status != 'DELETED').all()
if not lbs:
session.rollback()
LOG.error("No existing Load Balancers found")
return
# Figure out our audit period beging/ending
seconds = (cfg.CONF['admin_api'].exists_freq * 60)
interval = datetime.timedelta(seconds=seconds)
audit_period_ending = timeutils.utcnow()
audit_period_beginning = audit_period_ending - interval
audit_period_beginning = str(audit_period_beginning)
audit_period_ending = str(audit_period_ending)
for lb in lbs:
LOG.info(
"Sending MnB {0} notification to MnB for "
"loadbalancer {1} tenant_id {2}".format(
event_type, lb.id, lb.tenantid)
)
# Build the payload
payload = _build_payload(audit_period_beginning,
audit_period_ending,
lb.name, lb.id, lb.tenantid, lb.status)
_notify('lbaas', event_type, payload)
count += 1
session.commit()
LOG.info("Sent {0} MnB {1} notifications to MnB".format(count, event_type))
def _send_usage(event_type, start, stop):
LOG.info("Sending MnB {0} notifications to MnB".format(event_type))
N = cfg.CONF['admin_api'].usage_freq
with db_session() as session:
# Start by making sure we have stats in the Stats table and
# track the oldest value in case we need it below.
oldest, = session.query(Stats.period_end).\
order_by(Stats.id.asc()).first()
if oldest is None:
# No Stats at all
LOG.info("No usage statistics to send.")
session.rollback()
return
if start is None:
# The value in the DB must be '0000-00-00 00:00:00 so
# as a starting point, we can find the oldest stat in
# the Stats table and start from there. No sense iterating
# from 0000-00-00 to now looking for stats to send. Also
# round it back to the previous update period
start = _rounded_down_min(oldest, N)
LOG.info("Starting usage notifications from first saved {0}".
format(start))
# Now that we know where to start, make sure we have stats to
# send for the time period. Use stats that end in this period.
# It's ok if the stats started in a previous period. Some skew
# is allowed.
total = session.query(Stats).\
filter(Stats.period_end >= start).\
filter(Stats.period_end < stop).\
count()
if total == 0:
LOG.info("No usage statistics to send between {0} and {1}"
.format(start, stop))
session.rollback()
return
LOG.info("Found {0} total usage statistics to send between {1} and {2}"
.format(total, start, stop))
# Get info on all of our loadbalancers for the payloads.
loadbalancers = _get_lbs()
# Get ready to loop through however N minute periods we
# have to send. We do it this way rather than one lump sum
# because finer grain data is probably needed on the MnB side.
end = start + datetime.timedelta(minutes=N)
count = 0
while end <= stop:
# Loop through all N periods up to the current period
# sending usage notifications to MnB
stats = session.query(
Stats.lbid,
func.sum(Stats.bytes_out)
).group_by(Stats.lbid).\
filter(Stats.period_end >= start).\
filter(Stats.period_end < end).\
all()
# Prep for the next loop here in case of continue
prev_start = start
prev_end = end
start = end
end = start + datetime.timedelta(minutes=N)
if not stats:
LOG.info("No usage statistics to send for period {0} to {1}".
format(prev_start, prev_end))
continue
else:
LOG.info("Sending usage statistics for {0} to {1}".
format(prev_start, prev_end))
audit_period_beginning = str(prev_start)
audit_period_ending = str(prev_end)
for lb in stats:
lbid, byte_count = lb
if lbid not in loadbalancers:
LOG.error("Loadbalancer {0} not found in DB "
"not sending usage statistics".format(lbid))
continue
# Build the payload
payload = _build_payload(audit_period_beginning,
audit_period_ending,
loadbalancers[lbid]["name"],
lbid,
loadbalancers[lbid]["tenant_id"],
loadbalancers[lbid]["status"])
payload["metrics"] = _build_metrics(byte_count)
LOG.info(
"Sending MnB {0} notification to MnB for "
"loadbalancer {1} tenant_id {2} from "
"{3} to {4}: PAYLOAD = {5}".
format(event_type,
lbid,
loadbalancers[lbid]["tenant_id"],
prev_start,
prev_end,
payload)
)
_notify('lbaas', event_type, payload)
count += 1
# Purge old stats
if cfg.CONF['admin_api'].stats_purge_enable:
hours = cfg.CONF['admin_api'].stats_purge_days * 24
delta = datetime.timedelta(hours=hours)
exp = timeutils.utcnow() - delta
exp_time = exp.strftime('%Y-%m-%d %H:%M:%S')
purged = session.query(Stats).\
filter(Stats.period_end < exp_time).\
delete()
LOG.info("Purged {0} usage statistics from before {1}".
format(purged, exp_time))
session.commit()
LOG.info("Sent {0} MnB {1} notifications to MnB".format(count, event_type))
def _send_test(event_type, lbid, tenant_id):
# Build the payload
now = str(timeutils.utcnow())
LOG.error("Sending {0} test notifications".format(lbid))
if not test_mnb_connection():
# Abort the test notifications
LOG.info("Aborting test Notifications. Could not connect")
return
#Note lbid is the number of notifications to send
lbid += 1
for x in xrange(1, lbid):
payload = _build_payload(now, now, "Test LB", str(x),
str(tenant_id), 'active')
_notify('lbaas', 'lbaas.instance.test', payload)
def _build_payload(begin, end, name, id, tenant, status):
return {
"audit_period_beginning": begin,
"audit_period_ending": end,
"display_name": name,
"id": id,
"type": "lbaas.std",
"type_id": 1,
"tenant_id": tenant,
"state": status.lower(),
"state_description": status.lower()
}
def _build_metrics(bytes):
return {
"metric_name": "lbaas.network.outgoing.bytes",
"metric_type": "gauge",
"metric_units": "BYTES",
"metric_value": bytes
}
def _rounded_down_min(ts, N):
ts = ts - datetime.timedelta(minutes=ts.minute % N,
seconds=ts.second,
microseconds=ts.microsecond)
return ts
def _get_lbs():
all_lbs = {}
with db_session() as session:
lbs = session.query(
LoadBalancer.id,
LoadBalancer.tenantid,
LoadBalancer.name,
LoadBalancer.status,
).all()
for lb in lbs:
all_lbs[lb.id] = {
"tenant_id": lb.tenantid,
"name": lb.name,
"status": lb.status
}
session.commit()
return all_lbs

View File

@@ -16,3 +16,4 @@ wsme>=0.5b2
mysql-connector-python
ipaddress==1.0.4
six<1.4.0
kombu