diff --git a/etc/libra.cfg b/etc/libra.cfg index 16c28159..72e03a7b 100644 --- a/etc/libra.cfg +++ b/etc/libra.cfg @@ -51,12 +51,11 @@ #driver = haproxy #pid = /var/run/libra/libra_worker.pid -#logfile = /var/log/libra/libra_worker.log # HAProxy driver options for the worker [worker:haproxy] #service = ubuntu -#logfile = /var/log/haproxy.log +#statsfile = /var/log/haproxy.stats #----------------------------------------------------------------------- @@ -67,7 +66,6 @@ # Options with defaults #pid = /var/run/libra/libra_mgm.pid -#logfile = /var/log/libra/libra_mgm.log #threads = 4 #rm_fip_ignore_500 = false #nova_insecure = false @@ -101,7 +99,6 @@ nova_tenant_id = TENANTID # Options with defaults #host = 0.0.0.0 #port = 8889 -#logfile = /var/log/libra/libra_admin_api.log #pid = /var/run/libra/libra_admin_api.pid #expire_days = 0 #node_pool_size = 10 @@ -140,7 +137,6 @@ datadog_tags = service:lbaas #host = 0.0.0.0 #port = 443 #keystone_module = keystoneclient.middleware.auth_token:AuthProtocol -#logfile = /var/log/libra/libra_api.log #pid = /var/run/libra/libra_api.pid # Required options diff --git a/libra/tests/worker/test_lbstats.py b/libra/tests/worker/test_lbstats.py deleted file mode 100644 index 48983845..00000000 --- a/libra/tests/worker/test_lbstats.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -from libra.tests.base import TestCase -from libra.worker.drivers.haproxy.lbstats import LBStatistics - - -class TestLBStatistics(TestCase): - def setUp(self): - super(TestLBStatistics, self).setUp() - self.stats = LBStatistics() - - def testInitValues(self): - now = datetime.datetime.utcnow() - ts = self.stats.utc_timestamp - self.assertEquals(ts.year, now.year) - self.assertEquals(ts.month, now.month) - self.assertEquals(ts.day, now.day) - self.assertEquals(ts.hour, now.hour) - self.assertEquals(self.stats.bytes_out, 0L) - self.assertEquals(self.stats.bytes_in, 0L) - - def testSetBytesIn(self): - self.stats.bytes_in = 99L - self.assertEquals(self.stats.bytes_in, 99L) - e = self.assertRaises(TypeError, setattr, self.stats, - 'bytes_in', "NaN") - self.assertEqual("Must be a long integer: 'NaN'", e.message) - - def testSetBytesOut(self): - self.stats.bytes_out = 100L - self.assertEquals(self.stats.bytes_out, 100L) - e = self.assertRaises(TypeError, setattr, self.stats, - 'bytes_out', "NaN") - self.assertEqual("Must be a long integer: 'NaN'", e.message) - - def testSetUTCTimestamp(self): - ts = datetime.datetime.utcnow() - self.stats.utc_timestamp = ts - self.assertEquals(self.stats.utc_timestamp, ts) - e = self.assertRaises(TypeError, setattr, self.stats, - 'utc_timestamp', "NaN") - self.assertEqual("Must be a datetime.datetime: 'NaN'", e.message) diff --git a/libra/tests/worker/test_stats.py b/libra/tests/worker/test_stats.py new file mode 100644 index 00000000..bc86f31d --- /dev/null +++ b/libra/tests/worker/test_stats.py @@ -0,0 +1,90 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import os.path +import tempfile + +from libra.tests.base import TestCase +from libra.worker.drivers.haproxy.stats import StatisticsManager + + +class TestStatisticsManager(TestCase): + + def setUp(self): + super(TestStatisticsManager, self).setUp() + self.tmpfile = tempfile.gettempdir() + "/tstLibraTestStatsMgr.tmp" + self.mgr = StatisticsManager(self.tmpfile) + + def tearDown(self): + if os.path.exists(self.tmpfile): + os.remove(self.tmpfile) + super(TestStatisticsManager, self).tearDown() + + def testReadNoStatsFile(self): + self.assertEquals(self.mgr.get_start(), None) + self.assertEquals(self.mgr.get_end(), None) + self.assertEquals(self.mgr.get_last_tcp_bytes(), 0) + self.assertEquals(self.mgr.get_last_http_bytes(), 0) + self.assertEquals(self.mgr.get_unreported_tcp_bytes(), 0) + self.assertEquals(self.mgr.get_unreported_http_bytes(), 0) + + def testSave(self): + start_ts = datetime.datetime(2013, 1, 31, 12, 10, 30, 123456) + end_ts = start_ts + datetime.timedelta(minutes=5) + tcp_bytes = 1024 + http_bytes = 2048 + unreported_tcp_bytes = 3000 + unreported_http_bytes = 4000 + + self.mgr.save(start_ts, end_ts, + tcp_bytes=tcp_bytes, http_bytes=http_bytes) + self.mgr.read() + + self.assertEquals(self.mgr.get_start(), start_ts) + self.assertEquals(self.mgr.get_end(), end_ts) + self.assertEquals(self.mgr.get_last_tcp_bytes(), tcp_bytes) + self.assertEquals(self.mgr.get_last_http_bytes(), http_bytes) + self.assertEquals(self.mgr.get_unreported_tcp_bytes(), 0) + self.assertEquals(self.mgr.get_unreported_http_bytes(), 0) + + self.mgr.save(start_ts, end_ts, + unreported_tcp_bytes=unreported_tcp_bytes, + unreported_http_bytes=unreported_http_bytes) + self.mgr.read() + + self.assertEquals(self.mgr.get_start(), start_ts) + self.assertEquals(self.mgr.get_end(), end_ts) + self.assertEquals(self.mgr.get_last_tcp_bytes(), 0) + self.assertEquals(self.mgr.get_last_http_bytes(), 0) + self.assertEquals(self.mgr.get_unreported_tcp_bytes(), + unreported_tcp_bytes) + self.assertEquals(self.mgr.get_unreported_http_bytes(), + unreported_http_bytes) + + self.mgr.save(start_ts, end_ts, + tcp_bytes=tcp_bytes, + http_bytes=http_bytes, + unreported_tcp_bytes=unreported_tcp_bytes, + unreported_http_bytes=unreported_http_bytes) + self.mgr.read() + + self.assertEquals(self.mgr.get_start(), start_ts) + self.assertEquals(self.mgr.get_end(), end_ts) + self.assertEquals(self.mgr.get_last_tcp_bytes(), tcp_bytes) + self.assertEquals(self.mgr.get_last_http_bytes(), http_bytes) + self.assertEquals(self.mgr.get_unreported_tcp_bytes(), + unreported_tcp_bytes) + self.assertEquals(self.mgr.get_unreported_http_bytes(), + unreported_http_bytes) diff --git a/libra/worker/controller.py b/libra/worker/controller.py index a39b1041..3681d9f1 100644 --- a/libra/worker/controller.py +++ b/libra/worker/controller.py @@ -21,7 +21,7 @@ from libra import __release__ as libra_release from libra.common.exc import DeletedStateError from libra.common.faults import BadRequest from libra.openstack.common import log -from libra.worker.drivers.base import LoadBalancerDriver +from libra.worker.drivers import base LOG = log.getLogger(__name__) @@ -71,8 +71,7 @@ class LBaaSController(object): elif action == 'ARCHIVE': return self._action_archive() elif action == 'STATS': - # TODO: Implement new STATS function - return self._action_ping() + return self._action_stats() elif action == 'PING': return self._action_ping() elif action == 'DIAGNOSTICS': @@ -203,15 +202,15 @@ class LBaaSController(object): if 'algorithm' in current_lb: algo = current_lb['algorithm'].upper() if algo == 'ROUND_ROBIN': - algo = LoadBalancerDriver.ROUNDROBIN + algo = base.LoadBalancerDriver.ROUNDROBIN elif algo == 'LEAST_CONNECTIONS': - algo = LoadBalancerDriver.LEASTCONN + algo = base.LoadBalancerDriver.LEASTCONN else: LOG.error("Invalid algorithm: %s" % algo) self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE return self.msg else: - algo = LoadBalancerDriver.ROUNDROBIN + algo = base.LoadBalancerDriver.ROUNDROBIN try: self.driver.set_algorithm(current_lb['protocol'], algo) @@ -451,26 +450,69 @@ class LBaaSController(object): """ try: - stats = self.driver.get_status() + nodes = self.driver.get_status() except NotImplementedError: error = "Selected driver does not support PING action." LOG.error(error) self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE self.msg[self.ERROR_FIELD] = error except DeletedStateError: - LOG.info("Invalid operation PING on a deleted LB") + error = "Invalid operation PING on a deleted LB." + LOG.error(error) self.msg['status'] = 'DELETED' self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + self.msg[self.ERROR_FIELD] = error except Exception as e: LOG.error("PING failed: %s, %s" % (e.__class__, e)) self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE self.msg[self.ERROR_FIELD] = str(e) else: - node_status = stats.node_status_map() self.msg['nodes'] = [] - for node in node_status.keys(): - self.msg['nodes'].append({'id': node, - 'status': node_status[node]}) + for node, status in nodes: + self.msg['nodes'].append({'id': node, 'status': status}) self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS return self.msg + + def _action_stats(self): + """ + Get load balancer statistics + + This type of request gets the number of bytes out for each load + balancer defined on the device. If both a TCP and HTTP load + balancer exist, we report on each in a single response. + """ + + try: + start, end, statistics = self.driver.get_statistics() + except NotImplementedError: + error = "Selected driver does not support STATS action." + LOG.error(error) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + self.msg[self.ERROR_FIELD] = error + return self.msg + except DeletedStateError: + error = "Invalid operation STATS on a deleted LB." + LOG.error(error) + self.msg['status'] = 'DELETED' + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + self.msg[self.ERROR_FIELD] = error + return self.msg + except Exception as e: + LOG.error("STATS failed: %s, %s" % (e.__class__, e)) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + self.msg[self.ERROR_FIELD] = str(e) + return self.msg + + self.msg['utc_start'] = start + self.msg['utc_end'] = end + self.msg['loadBalancers'] = [] + + # We should have a list of tuples pairing the number of bytes + # out with the protocol/LB. + for proto, bytes_out in statistics: + self.msg['loadBalancers'].append({'protocol': proto, + 'bytes_out': bytes_out}) + + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS + return self.msg diff --git a/libra/worker/drivers/base.py b/libra/worker/drivers/base.py index 003997ac..35f919cf 100644 --- a/libra/worker/drivers/base.py +++ b/libra/worker/drivers/base.py @@ -97,7 +97,26 @@ class LoadBalancerDriver(object): raise NotImplementedError() def get_status(self, protocol): - """ Get load balancer status for specified protocol. """ + """ + Get load balancer status for specified protocol. + + Returns a list of tuples containing (in this order): + - node ID + - node status + """ + raise NotImplementedError() + + def get_statistics(self): + """ + Get load balancer statistics for all LBs on the device. + + Returns a tuple containing (in this order): + - start timestamp for the reporting period as a string + - end timestamp for the reporting period as a string + - list of tuples containing (in this order): + - protocol for the LB ('tcp' or 'http') as a string + - bytes out for this LB for this reporting period as an int + """ raise NotImplementedError() def archive(self, method, params): diff --git a/libra/worker/drivers/haproxy/__init__.py b/libra/worker/drivers/haproxy/__init__.py index c69799f0..db101aaa 100644 --- a/libra/worker/drivers/haproxy/__init__.py +++ b/libra/worker/drivers/haproxy/__init__.py @@ -26,6 +26,9 @@ cfg.CONF.register_opts( cfg.StrOpt('logfile', default='/var/log/haproxy.log', help='Location of HAProxy logfile'), + cfg.StrOpt('statsfile', + default='/var/log/haproxy.stats', + help='Location of the HAProxy statistics cache file'), ], group=haproxy_group ) diff --git a/libra/worker/drivers/haproxy/driver.py b/libra/worker/drivers/haproxy/driver.py index a5be917b..9082ec7d 100644 --- a/libra/worker/drivers/haproxy/driver.py +++ b/libra/worker/drivers/haproxy/driver.py @@ -396,6 +396,9 @@ class HAProxyDriver(LoadBalancerDriver): def get_status(self, protocol=None): return self.ossvc.get_status(protocol) + def get_statistics(self): + return self.ossvc.get_statistics() + def archive(self, method, params): """ Implementation of the archive() API call. diff --git a/libra/worker/drivers/haproxy/lbstats.py b/libra/worker/drivers/haproxy/lbstats.py deleted file mode 100644 index faee48f7..00000000 --- a/libra/worker/drivers/haproxy/lbstats.py +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime - - -class LBStatistics(object): - """ Load balancer statistics class. """ - - def __init__(self): - self.stats = {} - self.bytes_out = 0L - self.bytes_in = 0L - self.nodes = dict() - self.utc_timestamp = datetime.datetime.utcnow() - - @property - def bytes_out(self): - return self.stats['bytes_out'] - - @bytes_out.setter - def bytes_out(self, value): - if not isinstance(value, long): - raise TypeError("Must be a long integer: '%s'" % value) - self.stats['bytes_out'] = value - - @property - def bytes_in(self): - return self.stats['bytes_in'] - - @bytes_in.setter - def bytes_in(self, value): - if not isinstance(value, long): - raise TypeError("Must be a long integer: '%s'" % value) - self.stats['bytes_in'] = value - - @property - def utc_timestamp(self): - """ UTC timestamp for when these statistics are generated. """ - return self._utc_ts - - @utc_timestamp.setter - def utc_timestamp(self, value): - if not isinstance(value, datetime.datetime): - raise TypeError("Must be a datetime.datetime: '%s'" % value) - self._utc_ts = value - - def add_node_status(self, node, status): - self.nodes[node] = status - - def node_status_map(self): - """ Return a dictionary, indexed by node ID, of the node status """ - return self.nodes diff --git a/libra/worker/drivers/haproxy/query.py b/libra/worker/drivers/haproxy/query.py index cd99ef7a..60a7502f 100644 --- a/libra/worker/drivers/haproxy/query.py +++ b/libra/worker/drivers/haproxy/query.py @@ -16,7 +16,13 @@ import subprocess class HAProxyQuery(object): - """ Class used for querying the HAProxy statistics socket. """ + """ + Class used for querying the HAProxy statistics socket. + + The CSV output is defined in the HAProxy documentation: + + http://cbonte.github.io/haproxy-dconv/configuration-1.4.html#9 + """ def __init__(self, stats_socket): """ @@ -59,8 +65,8 @@ class HAProxyQuery(object): object_type Select the type of dumpable object. Values can be ORed. -1 - everything - 1 - backends - 2 - frontents + 1 - frontends + 2 - backends 4 - servers server_id @@ -71,6 +77,30 @@ class HAProxyQuery(object): list_results = results.split('\n') return list_results + def get_bytes_out(self, protocol=None): + """ + Get bytes out for the given protocol, or all protocols if + not specified. + + Return a dictionary keyed by protocol with bytes out as the value. + """ + if protocol: + filter_string = protocol.lower() + "-servers" + + results = self.show_stat(object_type=2) # backends only + + final_results = {} + for line in results[1:]: + elements = line.split(',') + if protocol and elements[0] != filter_string: + next + else: + proto, ignore = elements[0].split('-') + bytes_out = int(elements[9]) + final_results[proto.lower()] = bytes_out + + return final_results + def get_server_status(self, protocol=None): """ Get status for each server for a protocol backend. diff --git a/libra/worker/drivers/haproxy/services_base.py b/libra/worker/drivers/haproxy/services_base.py index 9baf14d8..2e8b85b9 100644 --- a/libra/worker/drivers/haproxy/services_base.py +++ b/libra/worker/drivers/haproxy/services_base.py @@ -50,8 +50,12 @@ class ServicesBase: """ Remove current and saved HAProxy config files. """ raise NotImplementedError() - def get_stats(self): - """ Get the stats from HAProxy. """ + def get_status(self, protocol): + """ Get status from HAProxy. """ + raise NotImplementedError() + + def get_statistics(self): + """ Get statistics from HAProxy. """ raise NotImplementedError() def sudo_copy(self, from_file, to_file): diff --git a/libra/worker/drivers/haproxy/stats.py b/libra/worker/drivers/haproxy/stats.py new file mode 100644 index 00000000..4238d964 --- /dev/null +++ b/libra/worker/drivers/haproxy/stats.py @@ -0,0 +1,162 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import os.path +import simplejson + + +class StatisticsManager(object): + """ + Class for managing statistics storage. + + Since HAProxy statistics are reset whenever the haproxy process is + restarted, we need a reliable way of maintaining these values across + restarts. This class attempts to manage the storage of the values. + + There are two types of statistics we record: + + * Unreported stats + These are stats that we need to save because a state change in + the HAProxy service is causing it to restart. Since HAProxy stores + its stats in memory, they would otherwise be lost. We save them here + for consideration in the next STATS request. + + * Last queried stats + These are total bytes out as reported from HAProxy the last time we + queried it for that information. + """ + + START_FIELD = 'start' + END_FIELD = 'end' + + # UNREPORTED_* values are for unreported statistics due to a restart + UNREPORTED_TCP_BYTES_FIELD = 'unreported_tcp_bytes_out' + UNREPORTED_HTTP_BYTES_FIELD = 'unreported_http_bytes_out' + + # LAST_* values are for values from our last query + LAST_TCP_BYTES_FIELD = 'last_tcp_bytes_out' + LAST_HTTP_BYTES_FIELD = 'last_http_bytes_out' + + def __init__(self, filename): + self.filename = filename + self._object = {} + self.read() + + def _do_save(self, obj): + with open(self.filename, "w") as fp: + simplejson.dump(obj, fp) + + def _format_timestamp(self, ts): + return datetime.datetime.strptime(ts, '%Y-%m-%d %H:%M:%S.%f') + + def save(self, start, end, tcp_bytes=0, http_bytes=0, + unreported_tcp_bytes=0, unreported_http_bytes=0): + """ + Save HAProxy statistics values, overwriting any existing data. + + start + Start timestamp from our last report. + + end + End timestamp from our last report. + + tcp_bytes + TOTAL bytes out of the TCP backend, as reported by haproxy, + when we last reported them back. + + http_bytes + TOTAL bytes out of the HTTP backend, as reported by haproxy, + when we last reported them back. + + unreported_tcp_bytes + TOTAL bytes out of the TCP backend, as reported by haproxy, + when the service was stopped or restarted. + + unreported_http_bytes + TOTAL bytes out of the HTTP backend, as reported by haproxy, + when the service was stopped or restarted. + """ + if None in [start, end]: + raise Exception('Cannot save None value for timestamps') + + if type(start) != datetime.datetime or type(end) != datetime.datetime: + raise TypeError('Timestamps must be datetime.datetime') + + obj = { + self.START_FIELD: str(start), + self.END_FIELD: str(end), + self.LAST_TCP_BYTES_FIELD: tcp_bytes, + self.LAST_HTTP_BYTES_FIELD: http_bytes, + self.UNREPORTED_TCP_BYTES_FIELD: unreported_tcp_bytes, + self.UNREPORTED_HTTP_BYTES_FIELD: unreported_http_bytes + } + self._do_save(obj) + + def read(self): + """ Read the current values from the file """ + if not os.path.exists(self.filename): + return + with open(self.filename, "r") as fp: + self._object = simplejson.load(fp) + + def get_start(self): + """ Return last start timestamp as datetime object """ + if self.START_FIELD in self._object: + return self._format_timestamp(self._object[self.START_FIELD]) + return None + + def get_end(self): + """ Return last end timestamp as datetime object """ + if self.END_FIELD in self._object: + return self._format_timestamp(self._object[self.END_FIELD]) + return None + + def get_unreported_tcp_bytes(self): + """ Return TCP unreported bytes out """ + if self.UNREPORTED_TCP_BYTES_FIELD in self._object: + return int(self._object[self.UNREPORTED_TCP_BYTES_FIELD]) + return 0 + + def get_unreported_http_bytes(self): + """ Return HTTP unreported bytes out """ + if self.UNREPORTED_HTTP_BYTES_FIELD in self._object: + return int(self._object[self.UNREPORTED_HTTP_BYTES_FIELD]) + return 0 + + def get_last_tcp_bytes(self): + """ Return TCP last reported bytes out """ + if self.LAST_TCP_BYTES_FIELD in self._object: + return int(self._object[self.LAST_TCP_BYTES_FIELD]) + return 0 + + def get_last_http_bytes(self): + """ Return HTTP last reported bytes out """ + if self.LAST_HTTP_BYTES_FIELD in self._object: + return int(self._object[self.LAST_HTTP_BYTES_FIELD]) + return 0 + + def calculate_new_start(self): + """ + Calculate a new start value for our reporting time range, + which should be just after the last reported end value. If + there is no start value, then we haven't recorded one yet + (i.e., haven't reported any stats yet) so use the current time. + """ + new_start = self.get_end() + if new_start is None: + new_start = datetime.datetime.utcnow() + else: + new_start = new_start + datetime.timedelta(microseconds=1) + return new_start diff --git a/libra/worker/drivers/haproxy/ubuntu_services.py b/libra/worker/drivers/haproxy/ubuntu_services.py index 8e9b9bab..e028de00 100644 --- a/libra/worker/drivers/haproxy/ubuntu_services.py +++ b/libra/worker/drivers/haproxy/ubuntu_services.py @@ -12,16 +12,22 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import os import subprocess -from libra.common.exc import DeletedStateError -from libra.worker.drivers.haproxy.lbstats import LBStatistics -from libra.worker.drivers.haproxy.services_base import ServicesBase -from libra.worker.drivers.haproxy.query import HAProxyQuery +from oslo.config import cfg + +from libra.common import exc +from libra.openstack.common import log +from libra.worker.drivers.haproxy import query +from libra.worker.drivers.haproxy import services_base +from libra.worker.drivers.haproxy import stats + +LOG = log.getLogger(__name__) -class UbuntuServices(ServicesBase): +class UbuntuServices(services_base.ServicesBase): """ Ubuntu-specific service implementation. """ def __init__(self): @@ -29,6 +35,40 @@ class UbuntuServices(ServicesBase): self._config_file = '/etc/haproxy/haproxy.cfg' self._backup_config = self._config_file + '.BKUP' + def _save_unreported(self): + """ + Save current HAProxy totals for an expected restart. + """ + q = query.HAProxyQuery('/var/run/haproxy-stats.socket') + results = q.get_bytes_out() + + stats_file = cfg.CONF['worker:haproxy']['statsfile'] + stats_mgr = stats.StatisticsManager(stats_file) + + # need to carry over current values + start = stats_mgr.get_start() + end = stats_mgr.get_end() + + if None in [start, end]: + start = datetime.datetime.utcnow() + end = start + + tcp_bo = stats_mgr.get_last_tcp_bytes() + http_bo = stats_mgr.get_last_http_bytes() + + curr_tcp_bo = 0 + curr_http_bo = 0 + if 'tcp' in results: + curr_tcp_bo = results['tcp'] + if 'http' in results: + curr_http_bo = results['http'] + + stats_mgr.save(start, end, + tcp_bytes=tcp_bo, + http_bytes=http_bo, + unreported_tcp_bytes=curr_tcp_bo, + unreported_http_bytes=curr_http_bo) + def syslog_restart(self): cmd = '/usr/bin/sudo -n /usr/sbin/service rsyslog restart' try: @@ -38,6 +78,8 @@ class UbuntuServices(ServicesBase): def service_stop(self): """ Stop the HAProxy service on the local machine. """ + self._save_unreported() + cmd = '/usr/bin/sudo -n /usr/sbin/service haproxy stop' try: subprocess.check_output(cmd.split()) @@ -65,6 +107,8 @@ class UbuntuServices(ServicesBase): This assumes that /etc/init.d/haproxy is using the -sf option to the haproxy process. """ + self._save_unreported() + cmd = '/usr/bin/sudo -n /usr/sbin/service haproxy reload' try: subprocess.check_output(cmd.split()) @@ -155,8 +199,7 @@ class UbuntuServices(ServicesBase): This function will query the HAProxy statistics socket and pull out the values that it needs for the given protocol (which equates to one - load balancer). The values are stored in a LBStatistics object that - will be returned to the caller. + load balancer). The output of the socket query is in CSV format and defined here: @@ -164,15 +207,76 @@ class UbuntuServices(ServicesBase): """ if not os.path.exists(self._config_file): - raise DeletedStateError("Load balancer is deleted.") + raise exc.DeletedStateError("Load balancer is deleted.") if not os.path.exists(self._haproxy_pid): raise Exception("HAProxy is not running.") - stats = LBStatistics() - query = HAProxyQuery('/var/run/haproxy-stats.socket') + q = query.HAProxyQuery('/var/run/haproxy-stats.socket') + return q.get_server_status(protocol) - node_status_list = query.get_server_status(protocol) - for node, status in node_status_list: - stats.add_node_status(node, status) + def get_statistics(self): + if not os.path.exists(self._config_file): + raise exc.DeletedStateError("Load balancer is deleted.") + if not os.path.exists(self._haproxy_pid): + raise Exception("HAProxy is not running.") - return stats + q = query.HAProxyQuery('/var/run/haproxy-stats.socket') + results = q.get_bytes_out() + + stats_file = cfg.CONF['worker:haproxy']['statsfile'] + stats_mgr = stats.StatisticsManager(stats_file) + + # date range for this report + new_start = stats_mgr.calculate_new_start() + new_end = datetime.datetime.utcnow() + + # previously recorded totals + prev_tcp_bo = stats_mgr.get_last_tcp_bytes() + prev_http_bo = stats_mgr.get_last_http_bytes() + unrpt_tcp_bo = stats_mgr.get_unreported_tcp_bytes() + unrpt_http_bo = stats_mgr.get_unreported_http_bytes() + + # current totals + current_tcp_bo = 0 + current_http_bo = 0 + if 'http' in results: + current_http_bo = results['http'] + if 'tcp' in results: + current_tcp_bo = results['tcp'] + + # If our totals that we previously recorded are greater than the + # totals we have now, and no unreported values, then somehow HAProxy + # was restarted outside of the worker's control, so we have no choice + # but to zero the values to avoid overcharging on usage. + if (unrpt_tcp_bo == 0 and unrpt_http_bo == 0) and \ + (prev_tcp_bo > current_tcp_bo) or (prev_http_bo > current_http_bo): + LOG.warn("Forced reset of HAProxy statistics") + prev_tcp_bo = 0 + prev_http_bo = 0 + + # Record totals for each protocol for comparison in the next request. + stats_mgr.save(new_start, new_end, + tcp_bytes=current_tcp_bo, + http_bytes=current_http_bo) + + # We are to deliver the number of bytes out since our last report, + # not the total, so calculate that here. Some examples: + # + # unreported total(A) | prev total(B) | current(C) | returned value + # | | | A + C - B + # --------------------+---------------+------------+--------------- + # 0 | 0 | 200 | 200 + # 0 | 200 | 1500 | 1300 + # 2000 | 1500 | 100 | 600 + + incremental_results = [] + if 'http' in results: + incremental_results.append( + ('http', unrpt_http_bo + current_http_bo - prev_http_bo) + ) + if 'tcp' in results: + incremental_results.append( + ('tcp', unrpt_tcp_bo + current_tcp_bo - prev_tcp_bo) + ) + + return str(new_start), str(new_end), incremental_results