Finish changes to stop an engine

Added a call to stop all react scripts, by passing all known routing
keys to the react_killer function. Then stop the threadpool executor.
Finally, Raise a known exception to stop the watchdog thread. This
will throw an ugly traceback, but will shutdown the engine
gracefully.

Also made minor changes to the example react.json, to change the log
format. Knowing the time a log was printed is useful.

Change-Id: Ibed06f79547312d188feb499f937eb5390d60c3e
This commit is contained in:
Pranesh Pandurangan 2014-06-20 17:39:50 -07:00
parent 857fb52cf1
commit c79d12c645
4 changed files with 31 additions and 18 deletions

View File

@ -18,9 +18,7 @@
import argparse import argparse
import logging import logging
import os import os
import psutil
import tempfile import tempfile
import time
from engine import Engine from engine import Engine
from entropy import utils from entropy import utils
@ -152,13 +150,7 @@ def start_engine(args):
def stop_engine(args): def stop_engine(args):
LOG.info("Stopping engine %s", args.name) LOG.info("Stopping engine %s", args.name)
# Grab engine config file, set our engine to disabled # Grab engine config file, set our engine to disabled
pid = utils.disable_engine(args.name, engine_cfg) utils.disable_engine(args.name, engine_cfg)
try:
p = psutil.Process(pid)
time.sleep(5)
p.terminate()
except psutil.NoSuchProcess:
LOG.exception("No running engine %s", args.name)
def parse(): def parse():

View File

@ -73,8 +73,11 @@ class Engine(object):
self.futures = [] self.futures = []
self.run_queue = collections.deque() self.run_queue = collections.deque()
# Private variables # Private variables
self._watchdog_event_fn = {self.repair_cfg: self.repair_modified, self._watchdog_event_fn = {
self.engine_cfg: self.engine_disabled} self.repair_cfg: self.repair_modified,
self.engine_cfg: self.engine_disabled,
self.audit_cfg: self.audit_modified,
}
# Private variables to keep track of repair scripts. # Private variables to keep track of repair scripts.
self._repairs = [] self._repairs = []
self._known_routing_keys = collections.defaultdict(list) self._known_routing_keys = collections.defaultdict(list)
@ -126,6 +129,7 @@ class Engine(object):
self.start_scheduler() self.start_scheduler()
def start_scheduler(self): def start_scheduler(self):
# Start serializer
if not self._serializer: if not self._serializer:
self._serializer = self.executor.submit(self.start_serializer) self._serializer = self.executor.submit(self.start_serializer)
self.futures.append(self._serializer) self.futures.append(self._serializer)
@ -137,6 +141,7 @@ class Engine(object):
self.futures.extend(self.start_react_scripts( self.futures.extend(self.start_react_scripts(
self._get_react_scripts())) self._get_react_scripts()))
# Start scheduler
self._scheduler = self.executor.submit(self.schedule) self._scheduler = self.executor.submit(self.schedule)
self.futures.append(self._scheduler) self.futures.append(self._scheduler)
self._scheduler.add_done_callback( self._scheduler.add_done_callback(
@ -257,15 +262,32 @@ class Engine(object):
def stop_engine(self): def stop_engine(self):
LOG.info("Stopping engine %s", self.name) LOG.info("Stopping engine %s", self.name)
# Set state to stop, which will stop serializers # Set state to stop, which will stop serializer, and scheduler
self._state = states.DISABLED self._state = states.DISABLED
# Clear run queue # Clear run queue
LOG.info("Clearing audit run queue for %s", self.name) LOG.info("Clearing audit run queue for %s", self.name)
self.run_queue.clear() self.run_queue.clear()
# Stop all repairs - not yet implemented
# Stop all repairs
LOG.info("Stopping all repairs for %s", self.name)
repairs_to_stop = self._known_routing_keys.keys()
self.stop_react_scripts(repairs_to_stop)
# Stop the executor - this is a blocking call.
LOG.info("Shutting down executor for %s", self.name)
self.executor.shutdown()
# Stop watchdog monitoring # Stop watchdog monitoring
LOG.info("Stopping watchdog for %s", self.name) LOG.info("Stopping watchdog for %s", self.name)
self._watchdog_thread.stop() # NOTE(praneshp): Till the watchdog authors respond with the right way
# to stop watchdog, we'll raise something from here. That will stop
# the watchdog thread, go back to the join() in start_scheduler(), and
# quit the program
raise exceptions.EngineStoppedException(
'Fake exception to kill watchdog thread')
def audit_modified(self):
return NotImplemented
def repair_modified(self): def repair_modified(self):
LOG.info('Repair configuration changed') LOG.info('Repair configuration changed')
@ -289,7 +311,7 @@ class Engine(object):
def start_watchdog(self): def start_watchdog(self):
LOG.debug('Watchdog mapping is: ', self._watchdog_event_fn) LOG.debug('Watchdog mapping is: ', self._watchdog_event_fn)
dirs_to_watch = [utils.get_filename_and_path(x)[0] for x in dirs_to_watch = [utils.get_filename_and_path(x)[0] for x in
self.engine_cfg, self.repair_cfg] self.engine_cfg, self.repair_cfg, self.audit_cfg]
return utils.watch_dir_for_change(dirs_to_watch, return utils.watch_dir_for_change(dirs_to_watch,
self._watchdog_event_fn) self._watchdog_event_fn)
@ -323,11 +345,9 @@ class Engine(object):
def stop_react_scripts(self, repairs_to_stop): def stop_react_scripts(self, repairs_to_stop):
# current react scripts # current react scripts
LOG.info("Currently running react scripts: %s", self._repairs)
for repair in repairs_to_stop: for repair in repairs_to_stop:
self.stop_react(repair) self.stop_react(repair)
# react scripts at the end # react scripts at the end
LOG.info("Currently running react scripts: %s", self._repairs)
def stop_react(self, repair): def stop_react(self, repair):
LOG.info("Stopping react script %s", repair) LOG.info("Stopping react script %s", repair)

View File

@ -9,5 +9,5 @@
"mq_password": "guest", "mq_password": "guest",
"routing_key": "demo", "routing_key": "demo",
"log_file": "/home/praneshp/code/entropy/entropy/examples/logs/demo.log", "log_file": "/home/praneshp/code/entropy/entropy/examples/logs/demo.log",
"log_format": "%(filename)s %(lineno)s %(message)s" "log_format": "%(asctime)s %(lineno)s %(message)s"
} }

View File

@ -110,6 +110,7 @@ def watch_dir_for_change(dirs_to_watch, event_fn):
observer = Observer() observer = Observer()
for directory in dirs_to_watch: for directory in dirs_to_watch:
observer.schedule(event_handler, path=directory) observer.schedule(event_handler, path=directory)
observer.setDaemon(True)
observer.start() observer.start()
return observer return observer