Add a culling thread to molteniron
The culling thread will sleep for --polling-seconds and deallocate nodes older than maxTime defined in conf.yaml. Change-Id: If68d335493f37e33d72a0ded1d3c05efa9795ca1 Closes-Bug: 1639960
This commit is contained in:
parent
daed944018
commit
f03e3483b8
|
@ -0,0 +1,191 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
This is the MoltenIron culling thread.
|
||||
"""
|
||||
|
||||
# Copyright (c) 2017 IBM Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# flake8 disabling E242
|
||||
# https://pep8.readthedocs.io/en/latest/intro.html
|
||||
# https://gitlab.com/pycqa/flake8/issues/63
|
||||
# Gah!
|
||||
|
||||
# pylint: disable-msg=C0103
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import argparse
|
||||
from daemonize import Daemonize
|
||||
from molteniron import molteniron
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import yaml
|
||||
|
||||
|
||||
DEBUG = False
|
||||
DAEMONIZE = True
|
||||
POLLING_SECONDS = 60
|
||||
YAML_CONF = "/usr/local/etc/molteniron/conf.yaml"
|
||||
PID = "/var/run/culler.pid"
|
||||
STOP_FLAG = threading.Event()
|
||||
|
||||
|
||||
class CullingThread(threading.Thread):
|
||||
"""Thread responsible for periodically running a cull operation"""
|
||||
|
||||
def __init__(self, event, yaml_conf, polling_seconds):
|
||||
"""Handles thread initialization"""
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
self.stopped = event
|
||||
|
||||
self.mi = molteniron.MoltenIron()
|
||||
|
||||
conf_parts = yaml_conf.split("/")[:-1]
|
||||
self.conf_dir = "/".join(conf_parts)
|
||||
|
||||
with open(yaml_conf, "r") as fobj:
|
||||
conf = yaml.load(fobj)
|
||||
|
||||
self.maxSeconds = conf["maxTime"]
|
||||
|
||||
self.mi.setup_conf(conf)
|
||||
|
||||
self.polling_seconds = polling_seconds
|
||||
|
||||
def run(self):
|
||||
"""Called when it is time to run"""
|
||||
while not self.stopped.wait(self.polling_seconds):
|
||||
if DEBUG:
|
||||
print("CullingThread:run:loop(%d)" % (self.polling_seconds, ))
|
||||
|
||||
# For example:
|
||||
# args_map = {"output": "json",
|
||||
# "type": "human",
|
||||
# "func": getattr(self.mi, "status"),
|
||||
# "conf_dir": "testenv/etc/molteniron/"}
|
||||
args_map = {
|
||||
"maxSeconds": self.maxSeconds,
|
||||
"func": getattr(self.mi, "cull"),
|
||||
"conf_dir": self.conf_dir
|
||||
}
|
||||
|
||||
# Call the function
|
||||
self.mi.call_function(args_map)
|
||||
|
||||
# Get the result
|
||||
response_map = self.mi.get_response_map()
|
||||
|
||||
try:
|
||||
rc = response_map["status"]
|
||||
except KeyError:
|
||||
msg = ("Error: Server returned: %s and we expected a status " +
|
||||
"somewhere") % (response_map, )
|
||||
print(msg, file=sys.stderr)
|
||||
exit(444)
|
||||
|
||||
if rc != 200:
|
||||
msg = "Error: Status was not 200 %s" % (
|
||||
response_map["message"], )
|
||||
print(msg, file=sys.stderr)
|
||||
exit(rc)
|
||||
|
||||
if DEBUG:
|
||||
print("CullingThread:run:exit")
|
||||
|
||||
|
||||
def signal_handler(signal, frame):
|
||||
"""When a SIGINT or SIGUSR1 is caught, trip our stop event"""
|
||||
if DEBUG:
|
||||
print("signal_handler (%s)" % (signal, ))
|
||||
|
||||
STOP_FLAG.set()
|
||||
|
||||
|
||||
def molteniron_culling():
|
||||
"""This is the main routine for the MoltenIron culling thread."""
|
||||
|
||||
# This is meant to be started from another program but support running
|
||||
# from main as well as a daemon
|
||||
t = CullingThread(STOP_FLAG, YAML_CONF, POLLING_SECONDS)
|
||||
|
||||
t.start()
|
||||
|
||||
while not STOP_FLAG.is_set():
|
||||
time.sleep(60)
|
||||
|
||||
if DEBUG:
|
||||
print("molteniron_culling:exit")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Molteniron culler")
|
||||
parser.add_argument("-c",
|
||||
"--conf-dir",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="conf_dir",
|
||||
help="The directory where configuration is stored")
|
||||
parser.add_argument("-s",
|
||||
"--polling-seconds",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="polling_seconds",
|
||||
help="The amount of seconds to sleep before a poll")
|
||||
parser.add_argument("-p",
|
||||
"--pid-dir",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="pid_dir",
|
||||
help="The directory where PID information is stored")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.polling_seconds:
|
||||
POLLING_SECONDS = int(args.polling_seconds)
|
||||
|
||||
if args.conf_dir:
|
||||
if not os.path.isdir(args.conf_dir):
|
||||
msg = "Error: %s is not a valid directory" % (args.conf_dir, )
|
||||
print(msg, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
YAML_CONF = os.path.realpath("%s/conf.yaml" % (args.conf_dir, ))
|
||||
|
||||
if args.pid_dir:
|
||||
if not os.path.isdir(args.pid_dir):
|
||||
msg = "Error: %s is not a valid directory" % (args.pid_dir, )
|
||||
print(msg, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
PID = os.path.realpath("%s/culler.pid" % (args.pid_dir, ))
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGUSR1, signal_handler)
|
||||
|
||||
if DAEMONIZE:
|
||||
daemon = Daemonize(app="molteniron_culling",
|
||||
pid=PID,
|
||||
action=molteniron_culling)
|
||||
|
||||
daemon.start()
|
||||
else:
|
||||
molteniron_culling()
|
|
@ -348,6 +348,26 @@ class MoltenIron(object):
|
|||
|
||||
return args
|
||||
|
||||
@command
|
||||
def cull(self, args=None, subparsers=None):
|
||||
"""Culls the database entries older than maxSeconds"""
|
||||
if subparsers is not None:
|
||||
sp = subparsers.add_parser("cull",
|
||||
help="Culls the database entries"
|
||||
" older than maxSeconds")
|
||||
sp.add_argument("-s",
|
||||
"--max-seconds",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="maxSeconds",
|
||||
help="The maximum number of seconds to live")
|
||||
sp.set_defaults(func=self.cull)
|
||||
return
|
||||
|
||||
args['method'] = 'cull'
|
||||
|
||||
return args
|
||||
|
||||
@command
|
||||
def delete_db(self, args=None, subparsers=None):
|
||||
"""Delete all database entries"""
|
||||
|
|
|
@ -175,6 +175,8 @@ def MakeMoltenIronHandlerWithConf(conf):
|
|||
response = database.status_baremetal(request["type"])
|
||||
elif method == 'delete_db':
|
||||
response = database.delete_db()
|
||||
elif method == 'cull':
|
||||
response = database.cull(request['maxSeconds'])
|
||||
database.close()
|
||||
del database
|
||||
except Exception as e:
|
||||
|
@ -803,19 +805,20 @@ class DataBase(object):
|
|||
if node.timestamp in ('', '-1', None):
|
||||
continue
|
||||
|
||||
currentTime = self.to_timestamp(time.gmtime())
|
||||
elapsedTime = currentTime - node.timestamp
|
||||
timestamp1 = time.mktime(time.gmtime())
|
||||
timestamp2 = calendar.timegm(node.timestamp.timetuple())
|
||||
|
||||
elapsedTime = timestamp1 - timestamp2
|
||||
|
||||
if DEBUG:
|
||||
print("currentTime = %s"
|
||||
% (currentTime, ))
|
||||
print("node.timestamp = %s"
|
||||
% (node.timestamp, ))
|
||||
print("timestamp1 = %s"
|
||||
% (timestamp1, ))
|
||||
print("timestamp2 = %s"
|
||||
% (timestamp2, ))
|
||||
print("elapsedTime = %s"
|
||||
% (elapsedTime, ))
|
||||
print("elapsedTime.seconds = %s"
|
||||
% (elapsedTime.seconds, ))
|
||||
|
||||
if elapsedTime.seconds < int(maxSeconds):
|
||||
if elapsedTime < int(maxSeconds):
|
||||
continue
|
||||
|
||||
logstring = ("node %d has been allocated for too long."
|
||||
|
|
Loading…
Reference in New Issue