Add more steps to stop an engine
Add elements to audit run queue only if engine is enabled. Use engine state as the conditional for running serializer, audits. On stopping the engine, clear the current run queue. Work on blueprint kill-repair-scripts Change-Id: Iba8c94caec9e75f4b6fda8d4bd0bf07b25e596f8
This commit is contained in:
parent
68654b7b86
commit
119595c6c7
|
@ -128,7 +128,7 @@ class Engine(object):
|
||||||
self._watchdog_thread.join()
|
self._watchdog_thread.join()
|
||||||
|
|
||||||
def schedule(self):
|
def schedule(self):
|
||||||
while True:
|
while self._state == self.ENABLED:
|
||||||
(next_time, next_jobs) = self.wait_next(self.engine_timeout)
|
(next_time, next_jobs) = self.wait_next(self.engine_timeout)
|
||||||
# NOTE(praneshp): here, call a function that will wait till next
|
# NOTE(praneshp): here, call a function that will wait till next
|
||||||
# time and call next_jobs,
|
# time and call next_jobs,
|
||||||
|
@ -146,8 +146,8 @@ class Engine(object):
|
||||||
if not self.run_queue:
|
if not self.run_queue:
|
||||||
if watch and watch.expired():
|
if watch and watch.expired():
|
||||||
raise exceptions.TimeoutException(
|
raise exceptions.TimeoutException(
|
||||||
"Died after waiting for audits to arrive for %s" %
|
"Died at %s after waiting for audits to arrive "
|
||||||
watch.elapsed())
|
"for %s" % (utils.wallclock(), watch.elapsed()))
|
||||||
else:
|
else:
|
||||||
# Grab all the jobs for the next time.
|
# Grab all the jobs for the next time.
|
||||||
next_jobs.append(self.run_queue.popleft())
|
next_jobs.append(self.run_queue.popleft())
|
||||||
|
@ -169,12 +169,13 @@ class Engine(object):
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now()
|
||||||
cron = croniter.croniter(schedule, now)
|
cron = croniter.croniter(schedule, now)
|
||||||
next_iteration = cron.get_next(datetime.datetime)
|
next_iteration = cron.get_next(datetime.datetime)
|
||||||
while True:
|
while self._state == self.ENABLED:
|
||||||
LOG.info('It is %s, next serializer at %s', now, next_iteration)
|
LOG.info('It is %s, next serializer at %s', now, next_iteration)
|
||||||
pause.until(next_iteration)
|
pause.until(next_iteration)
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now()
|
||||||
next_iteration = cron.get_next(datetime.datetime)
|
next_iteration = cron.get_next(datetime.datetime)
|
||||||
self.run_serializer(next_iteration, now)
|
if self._state == self.ENABLED:
|
||||||
|
self.run_serializer(next_iteration, now)
|
||||||
|
|
||||||
def run_serializer(self, next_iteration, current_time):
|
def run_serializer(self, next_iteration, current_time):
|
||||||
LOG.info("Running serializer for %s at %s", self.name, current_time)
|
LOG.info("Running serializer for %s at %s", self.name, current_time)
|
||||||
|
@ -201,7 +202,10 @@ class Engine(object):
|
||||||
|
|
||||||
new_additions.sort(key=operator.itemgetter('time'))
|
new_additions.sort(key=operator.itemgetter('time'))
|
||||||
|
|
||||||
self.run_queue.extend(new_additions)
|
# NOTE(praneshp): Protect this operation with a state check, so in
|
||||||
|
# case of race conditions no extra audit scripts are added.
|
||||||
|
if self._state == self.ENABLED:
|
||||||
|
self.run_queue.extend(new_additions)
|
||||||
LOG.info("Run queue till %s is %s", next_iteration, self.run_queue)
|
LOG.info("Run queue till %s is %s", next_iteration, self.run_queue)
|
||||||
LOG.info("Repair scripts at %s: %s", next_iteration, self._repairs)
|
LOG.info("Repair scripts at %s: %s", next_iteration, self._repairs)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -216,8 +220,10 @@ class Engine(object):
|
||||||
def stop_engine(self):
|
def stop_engine(self):
|
||||||
LOG.info("Stopping engine %s", self.name)
|
LOG.info("Stopping engine %s", self.name)
|
||||||
# Set state to stop, which will stop serializers
|
# Set state to stop, which will stop serializers
|
||||||
LOG.info("Setting %s to state: %s", self.name, states.DISABLED)
|
# Clear run queue
|
||||||
self._state = states.DISABLED
|
LOG.info("Clearing audit run queue for %s", self.name)
|
||||||
|
self.run_queue.clear()
|
||||||
|
# Stop all repairs - not yet implemented
|
||||||
# Stop watchdog monitoring
|
# Stop watchdog monitoring
|
||||||
LOG.info("Stopping watchdog for %s", self.name)
|
LOG.info("Stopping watchdog for %s", self.name)
|
||||||
self._watchdog_thread.stop()
|
self._watchdog_thread.stop()
|
||||||
|
@ -236,6 +242,11 @@ class Engine(object):
|
||||||
def setup_audit(self, execution_time, audit_list):
|
def setup_audit(self, execution_time, audit_list):
|
||||||
try:
|
try:
|
||||||
pause.until(execution_time)
|
pause.until(execution_time)
|
||||||
|
# Only proceed if engine is running, i.e in enabled state.
|
||||||
|
if self._state != self.ENABLED:
|
||||||
|
LOG.info("%s is disabled, so not running audits at %s",
|
||||||
|
self.name, execution_time)
|
||||||
|
return
|
||||||
LOG.info("Time: %s, Starting %s", execution_time, audit_list)
|
LOG.info("Time: %s, Starting %s", execution_time, audit_list)
|
||||||
audit_futures = []
|
audit_futures = []
|
||||||
for audit in audit_list:
|
for audit in audit_list:
|
||||||
|
|
Loading…
Reference in New Issue