First push of base for statsd code

Change-Id: Ia817023075f7041a74b402f83abb2f8324a4995a
This commit is contained in:
Andrew Hutchings
2013-04-16 13:40:23 +01:00
parent 58471b561b
commit 0715f0a4f8
9 changed files with 337 additions and 44 deletions

View File

@@ -1,4 +1,4 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P. # Copyright 2013 Hewlett-Packard Development Company, L.P.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain

76
libra/statsd/admin_api.py Normal file
View File

@@ -0,0 +1,76 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import requests
import random
import sys
class AdminAPI(object):
def __init__(self, addresses, logger):
self.logger = logger
random.shuffle(addresses)
for address in addresses:
self.url = 'https://{0}/v1'.format(address)
self.logger.info('Trying {url}'.format(url=self.url))
status, data = self._get('{url}/devices/usage'
.format(url=self.url))
if status:
self.logger.info('API Server is online')
self.online = True
return
# if we get this far all API servers are down
self.online = False
def get_ping_list(self):
# TODO: we need an error list to ping (maybe separate sched?) with
# all clear
marker = 0
limit = 20
lb_list = []
while True:
nodes = self._get_node_list(limit, marker)
# if we hit an empty device list we have hit end of list
if not len(nodes['devices']):
break
for device in nodes['devices']:
if device['status'] == 'ONLINE':
lb_list.append(device)
marker = marker + limit
return lb_list
def _get_node_list(self, limit, marker):
return self._get(
'{url}/devices?marker={marker}&limit={limit}'
.format(url=self.url, marker=marker, limit=limit)
)
def _get(self, url):
try:
r = requests.get(url, verify=False)
except requests.exceptions.RequestException:
self.logger.error('Exception communicating to server: {exc}'
.format(exc=sys.exc_info()[0]))
return False, None
if r.status_code != 200:
self.logger.error('Server returned error {code}'
.format(code=r.status_code))
return False, r.json()
return True, r.json()
def is_online(self):
return self.online

View File

@@ -0,0 +1,13 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,24 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
known_drivers = {
'dummy': 'libra.statsd.drivers.dummy.driver.DummyDriver'
}
class AlertDriver(object):
def __init__(self, logger):
self.logger = logger
def send_alert(self):
raise NotImplementedError()

View File

@@ -0,0 +1,13 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,19 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
from libra.statsd.drivers.base import AlertDriver
class DummyDriver(AlertDriver):
def send_alert(self, message):
self.logger.info('Dummy alert send of {0}'.format(message))

49
libra/statsd/gearman.py Normal file
View File

@@ -0,0 +1,49 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gearman
from libra.common.json_gearman import JSONGearmanClient
class GearJobs(object):
def __init__(self, logger, args):
self.logger = logger
self.gm_client = JSONGearmanClient(args.server)
def send_pings(self, node_list):
list_of_jobs = []
failed_list = []
job_data = {"hpcs_action": "STATS"}
for node in node_list:
list_of_jobs.append(dict(task=node, data=job_data))
submitted_pings = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=5.0
)
for ping in submitted_pings:
if ping.state == gearman.JOB_UNKNOWN:
# TODO: Gearman server failed, ignoring for now
self.logger.error('Gearman Job server fail')
continue
if ping.timed_out:
# Ping timeout
failed_list.append(ping['task'])
continue
if ping.result['hpcs_response'] == 'FAIL':
# Error returned by Gearman
failed_list.append(ping['task'])
continue
return failed_list

View File

@@ -1,4 +1,4 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P. # Copyright 2013 Hewlett-Packard Development Company, L.P.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@@ -16,67 +16,49 @@ import daemon
import daemon.pidfile import daemon.pidfile
import daemon.runner import daemon.runner
import lockfile import lockfile
import gearman.errors
import grp import grp
import json
import os import os
import pwd import pwd
import socket
import time import time
from libra.common.json_gearman import JSONGearmanWorker
from libra.common.options import Options, setup_logging from libra.common.options import Options, setup_logging
from libra.openstack.common import importutils
from libra.statsd.drivers.base import known_drivers
from libra.statsd.scheduler import Sched
class CustomJSONGearmanWorker(JSONGearmanWorker): def start(logger, args, drivers):
""" Custom class we will use to pass arguments to the Gearman task. """
logger = None
def handler(worker, job):
""" Main Gearman worker task. """
logger = worker.logger
logger.debug("Received JSON message: %s" % json.dumps(job.data, indent=4))
return {"OK"}
def start(logger, servers):
""" Start the main server processing. """ """ Start the main server processing. """
hostname = socket.gethostname() scheduler = Sched(logger, args, drivers)
task_name = "lbaas-statistics" scheduler.start()
worker = CustomJSONGearmanWorker(servers) while True:
worker.set_client_id(hostname) time.sleep(1)
worker.register_task(task_name, handler)
worker.logger = logger
retry = True
while retry:
try:
worker.work()
except KeyboardInterrupt:
retry = False
except gearman.errors.ServerUnavailable:
logger.error("[statsd] Job server(s) went away. Reconnecting.")
time.sleep(60)
retry = True
except Exception as e:
logger.critical("[statsd] Exception: %s, %s" % (e.__class__, e))
retry = False
logger.info("[statsd] Statistics process terminated.")
def main(): def main():
""" Main Python entry point for the statistics daemon. """ """ Main Python entry point for the statistics daemon. """
drivers = []
options = Options('statsd', 'Statistics Daemon') options = Options('statsd', 'Statistics Daemon')
options.parser.add_argument(
'--driver', dest='driver',
choices=known_drivers.keys(), default='dummy',
help='type of device to use'
)
options.parser.add_argument( options.parser.add_argument(
'--server', dest='server', action='append', metavar='HOST:PORT', '--server', dest='server', action='append', metavar='HOST:PORT',
default=[], default=[],
help='add a Gearman job server to the connection list' help='add a Gearman job server to the connection list'
) )
options.parser.add_argument(
'--ping_interval', type=int, default=60,
help='how often to ping load balancers (in seconds)'
)
options.parser.add_argument(
'--api_server', action='append', metavar='HOST:PORT', default=[],
help='a list of API servers to connect to'
)
args = options.run() args = options.run()
@@ -93,10 +75,28 @@ def main():
svr_list = args.server.split() svr_list = args.server.split()
args.server = svr_list args.server = svr_list
if not args.api_server:
# NOTE(shrews): Can't set a default in argparse method because the
# value is appended to the specified default.
args.api_server.append('localhost:8889')
elif not isinstance(args.api_server, list):
# NOTE(shrews): The Options object cannot intelligently handle
# creating a list from an option that may have multiple values.
# We convert it to the expected type here.
svr_list = args.api_server.split()
args.api_server = svr_list
logger.info("Job server list: %s" % args.server) logger.info("Job server list: %s" % args.server)
logger.info("Selected drivers: {0}".format(args.driver))
if not isinstance(args.driver, list):
args.driver = args.driver.split()
for driver in args.driver:
drivers.append(importutils.import_class(
known_drivers[driver]
))
if args.nodaemon: if args.nodaemon:
start(logger, args.server) start(logger, args, drivers)
else: else:
pidfile = daemon.pidfile.TimeoutPIDLockFile(args.pid, 10) pidfile = daemon.pidfile.TimeoutPIDLockFile(args.pid, 10)
if daemon.runner.is_pidfile_stale(pidfile): if daemon.runner.is_pidfile_stale(pidfile):
@@ -131,4 +131,4 @@ def main():
args.pid args.pid
) )
start(logger, args.server) start(logger, args, drivers)

99
libra/statsd/scheduler.py Normal file
View File

@@ -0,0 +1,99 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import signal
import sys
from libra.statsd.admin_api import AdminAPI
from libra.statsd.gearman import GearJobs
class Sched(object):
def __init__(self, logger, args, drivers):
self.logger = logger
self.args = args
self.drivers = drivers
self.rlock = threading.RLock()
self.ping_timer = None
signal.signal(signal.SIGINT, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
def start(self):
self.ping_lbs()
def exit_handler(self, signum, frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.shutdown(False)
def shutdown(self, error):
if self.ping_timer:
self.ping_timer.cancel()
if not error:
self.logger.info('Safely shutting down')
sys.exit(0)
else:
self.logger.info('Shutting down due to error')
sys.exit(1)
def ping_lbs(self):
pings = 0
failed = 0
with self.rlock:
try:
pings, failed = self._exec_ping()
except Exception:
self.logger.exception('Uncaught exception during LB ping')
# Need to restart timer after every ping cycle
self.logger.info('{pings} loadbalancers pinged, {failed} failed'
.format(pings=pings, failed=failed))
self.start_ping_sched()
def _exec_ping(self):
pings = 0
failed = 0
node_list = []
self.logger.info('Running ping check')
api = AdminAPI(self.args.api_server, self.logger)
if api.is_online():
lb_list = api.get_ping_list()
pings = len(lb_list)
for lb in lb_list:
node_list.append(lb['name'])
gearman = GearJobs(self.logger, self.args)
failed_nodes = gearman.send_pings(node_list)
failed = len(failed_nodes)
if failed > 0:
self._send_fails(lb_list, failed_nodes)
else:
self.logger.error('No working API server found')
return (0, 0)
return pings, failed
def _send_fails(self, failed_nodes):
# TODO: add message and more node details
for node in failed_nodes:
for driver in self.drivers:
driver.send_alert('Node failed with IP {0}', node)
def start_ping_sched(self):
self.logger.info('LB ping check timer sleeping for {secs} seconds'
.format(secs=self.args.ping_interval))
self.ping_timer = threading.Timer(self.args.ping_interval,
self.ping_lbs, ())
self.ping_timer.start()